-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathevent_writer.rs
More file actions
100 lines (84 loc) · 3.07 KB
/
event_writer.rs
File metadata and controls
100 lines (84 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use super::shared_state::SharedState;
use crate::telemetry::collector::Batch;
use crate::telemetry::writer::TraceWriter;
/// Intermediate layer between the recorder and the raw `TraceWriter`.
///
/// Owns the writer. Its API is roughly:
///
/// - `write_raw_event(event)` — encode and write a single event (test only)
/// - `flush_sources(shared)` — drain data sources into the trace
/// - `flush()` — flush the underlying writer
pub(crate) struct EventWriter {
pub(super) writer: Box<dyn TraceWriter>,
events_written: u64,
}
impl EventWriter {
pub(crate) fn new(writer: Box<dyn TraceWriter>) -> Self {
Self {
writer,
events_written: 0,
}
}
pub(crate) fn events_written(&self) -> u64 {
self.events_written
}
// TODO: delete/refactor this method, it is only used in tests.
/// Encode a single event into a batch and write it through the writer.
#[cfg(all(test, feature = "cpu-profiling"))]
pub(crate) fn write_raw_event(
&mut self,
event: &dyn crate::telemetry::buffer::Encodable,
) -> std::io::Result<()> {
use crate::telemetry::buffer::ThreadLocalBuffer;
let encoded_bytes = ThreadLocalBuffer::encode_single(event);
let batch = Batch {
encoded_bytes,
event_count: 1,
};
self.writer.write_encoded_batch(&batch)?;
self.events_written += 1;
Ok(())
}
/// Transcode an entire batch through the writer, correctly accounting for
/// the number of events the batch contains.
pub(crate) fn write_encoded_batch(&mut self, batch: &Batch) -> std::io::Result<()> {
self.writer.write_encoded_batch(batch)?;
self.events_written += batch.event_count;
Ok(())
}
/// Drain data sources and write their events into the trace.
pub(crate) fn flush_sources(&mut self, shared: &SharedState) {
use super::source::FlushContext;
let roles = shared.thread_roles.lock().unwrap().clone();
let ctx = FlushContext {
collector: &shared.collector,
drain_epoch: &shared.drain_epoch,
thread_roles: &roles,
};
let mut sources = shared.sources.lock().unwrap();
for source in sources.iter_mut() {
source.flush(&ctx);
}
}
pub(crate) fn segment_metadata(&self) -> &[(String, String)] {
self.writer.segment_metadata()
}
pub(crate) fn update_segment_metadata(&mut self, entries: Vec<(String, String)>) {
self.writer.update_segment_metadata(entries);
}
pub(crate) fn write_current_segment_metadata(&mut self) -> std::io::Result<()> {
self.writer.write_current_segment_metadata()
}
pub(crate) fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
pub(crate) fn should_drain(&self) -> bool {
self.writer.should_drain()
}
pub(crate) fn drained(&mut self) -> std::io::Result<bool> {
self.writer.drained()
}
pub(crate) fn finalize(&mut self) -> std::io::Result<()> {
self.writer.finalize()
}
}