Skip to content

Commit 9e2e3fd

Browse files
committed
rearrange intercept module
1 parent b92f73c commit 9e2e3fd

File tree

7 files changed

+180
-184
lines changed

7 files changed

+180
-184
lines changed

bear/src/intercept/mod.rs

Lines changed: 160 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -20,145 +20,12 @@ use std::process::ExitCode;
2020
use std::sync::mpsc::{channel, Receiver, Sender};
2121
use std::sync::Arc;
2222
use std::{fmt, thread};
23-
use supervise::supervise;
2423
use thiserror::Error;
2524

26-
/// Errors that can occur in the reporter.
27-
#[derive(Error, Debug)]
28-
pub enum ReporterError {
29-
#[error("Environment variable '{0}' is missing")]
30-
MissingEnvironmentVariable(String),
31-
#[error("Network error: {0}")]
32-
Network(String),
33-
#[error("Serialization error: {0}")]
34-
Serialization(#[from] serde_json::Error),
35-
#[error("IO error: {0}")]
36-
Io(#[from] std::io::Error),
37-
}
38-
39-
/// Errors that can occur in the collector.
40-
#[derive(Error, Debug)]
41-
pub enum CollectorError {
42-
#[error("IO error: {0}")]
43-
Io(#[from] std::io::Error),
44-
#[error("Network error: {0}")]
45-
Network(String),
46-
#[error("Serialization error: {0}")]
47-
Serialization(#[from] serde_json::Error),
48-
#[error("Channel communication error: {0}")]
49-
Channel(String),
50-
}
51-
52-
impl<T> From<std::sync::mpsc::SendError<T>> for CollectorError {
53-
fn from(err: std::sync::mpsc::SendError<T>) -> Self {
54-
CollectorError::Channel(format!("Failed to send message: {}", err))
55-
}
56-
}
57-
58-
/// Errors that can occur in the intercept environment and configuration.
59-
#[derive(Error, Debug)]
60-
pub enum InterceptError {
61-
#[error("IO error: {0}")]
62-
Io(#[from] std::io::Error),
63-
#[error("Process execution error: {0}")]
64-
ProcessExecution(String),
65-
#[error("Configuration validation error: {0}")]
66-
ConfigValidation(String),
67-
#[error("Path manipulation error: {0}")]
68-
Path(String),
69-
#[error("Thread join error")]
70-
ThreadJoin,
71-
#[error("No executable found in build command")]
72-
NoExecutable,
73-
#[error("Reporter error: {0}")]
74-
Reporter(#[from] ReporterError),
75-
#[error("Collector error: {0}")]
76-
Collector(#[from] CollectorError),
77-
}
78-
79-
impl From<std::env::JoinPathsError> for InterceptError {
80-
fn from(err: std::env::JoinPathsError) -> Self {
81-
InterceptError::Path(format!("Failed to join paths: {}", err))
82-
}
83-
}
84-
8525
/// Declare the environment variables used by the intercept mode.
8626
pub const KEY_DESTINATION: &str = "INTERCEPT_COLLECTOR_ADDRESS";
8727
const KEY_PRELOAD_PATH: &str = "LD_PRELOAD";
8828

89-
/// Creates a new reporter instance.
90-
///
91-
/// This function is supposed to be called from another process than the collector.
92-
/// Therefore, the reporter destination is passed as an environment variable.
93-
/// The reporter destination is the address of the collector service.
94-
pub fn create_reporter() -> Result<Box<dyn Reporter>, ReporterError> {
95-
let address = std::env::var(KEY_DESTINATION)
96-
.map_err(|_| ReporterError::MissingEnvironmentVariable(KEY_DESTINATION.to_string()))?;
97-
let reporter = tcp::ReporterOnTcp::new(address);
98-
Ok(Box::new(reporter))
99-
}
100-
101-
pub fn create_reporter_on_tcp(address: &str) -> Box<dyn Reporter> {
102-
let reporter = tcp::ReporterOnTcp::new(address.to_string());
103-
Box::new(reporter)
104-
}
105-
106-
/// Represents the remote sink of supervised process events.
107-
///
108-
/// This allows the reporters to send events to a remote collector.
109-
pub trait Reporter {
110-
fn report(&self, event: Event) -> Result<(), ReporterError>;
111-
}
112-
113-
/// Represents the local sink of supervised process events.
114-
///
115-
/// The collector is responsible for collecting the events from the reporters.
116-
///
117-
/// To share the collector between threads, we use the `Arc` type to wrap the
118-
/// collector. This way we can clone the collector and send it to other threads.
119-
pub trait Collector {
120-
/// Returns the address of the collector.
121-
///
122-
/// The address is in the format of `ip:port`.
123-
fn address(&self) -> String;
124-
125-
/// Collects the events from the reporters.
126-
///
127-
/// The events are sent to the given destination channel.
128-
///
129-
/// The function returns when the collector is stopped. The collector is stopped
130-
/// when the `stop` method invoked (from another thread).
131-
fn collect(&self, destination: Sender<Event>) -> Result<(), CollectorError>;
132-
133-
/// Stops the collector.
134-
fn stop(&self) -> Result<(), CollectorError>;
135-
}
136-
137-
/// Represent a relevant life cycle event of a process.
138-
///
139-
/// In the current implementation, we only have one event, the `Started` event.
140-
/// This event is sent when a process is started. It contains the process id
141-
/// and the execution information.
142-
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
143-
pub struct Event {
144-
pub pid: u32,
145-
pub execution: Execution,
146-
}
147-
148-
impl Event {
149-
/// Creates a new event that is originated from the current process.
150-
pub fn new(execution: Execution) -> Self {
151-
let pid = std::process::id();
152-
Event { pid, execution }
153-
}
154-
}
155-
156-
impl fmt::Display for Event {
157-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
158-
write!(f, "Event pid={}, execution={}", self.pid, self.execution)
159-
}
160-
}
161-
16229
/// Execution is a representation of a process execution.
16330
///
16431
/// It does not contain information about the outcome of the execution,
@@ -199,6 +66,24 @@ impl Execution {
19966
updated.environment = environment;
20067
updated
20168
}
69+
70+
#[cfg(test)]
71+
pub fn from_strings(
72+
executable: &str,
73+
arguments: Vec<&str>,
74+
working_dir: &str,
75+
environment: HashMap<&str, &str>,
76+
) -> Self {
77+
Self {
78+
executable: PathBuf::from(executable),
79+
arguments: arguments.iter().map(|s| s.to_string()).collect(),
80+
working_dir: PathBuf::from(working_dir),
81+
environment: environment
82+
.iter()
83+
.map(|(k, v)| (k.to_string(), v.to_string()))
84+
.collect(),
85+
}
86+
}
20287
}
20388

20489
impl TryFrom<args::BuildCommand> for Execution {
@@ -246,38 +131,93 @@ impl fmt::Display for Execution {
246131
}
247132
}
248133

249-
#[cfg(test)]
250-
pub fn execution(
251-
executable: &str,
252-
arguments: Vec<&str>,
253-
working_dir: &str,
254-
environment: HashMap<&str, &str>,
255-
) -> Execution {
256-
Execution {
257-
executable: PathBuf::from(executable),
258-
arguments: arguments.iter().map(|s| s.to_string()).collect(),
259-
working_dir: PathBuf::from(working_dir),
260-
environment: environment
261-
.iter()
262-
.map(|(k, v)| (k.to_string(), v.to_string()))
263-
.collect(),
134+
/// Represent a relevant life cycle event of a process.
135+
///
136+
/// In the current implementation, we only have one event, the `Started` event.
137+
/// This event is sent when a process is started. It contains the process id
138+
/// and the execution information.
139+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
140+
pub struct Event {
141+
pub pid: u32,
142+
pub execution: Execution,
143+
}
144+
145+
impl Event {
146+
/// Creates a new event that is originated from the current process.
147+
pub fn new(execution: Execution) -> Self {
148+
let pid = std::process::id();
149+
Event { pid, execution }
150+
}
151+
152+
#[cfg(test)]
153+
pub fn from_strings(
154+
pid: u32,
155+
executable: &str,
156+
arguments: Vec<&str>,
157+
working_dir: &str,
158+
environment: HashMap<&str, &str>,
159+
) -> Self {
160+
Self {
161+
pid,
162+
execution: Execution::from_strings(executable, arguments, working_dir, environment),
163+
}
264164
}
265165
}
266166

267-
#[cfg(test)]
268-
pub fn event(
269-
pid: u32,
270-
executable: &str,
271-
arguments: Vec<&str>,
272-
working_dir: &str,
273-
environment: HashMap<&str, &str>,
274-
) -> Event {
275-
Event {
276-
pid,
277-
execution: execution(executable, arguments, working_dir, environment),
167+
impl fmt::Display for Event {
168+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
169+
write!(f, "Event pid={}, execution={}", self.pid, self.execution)
278170
}
279171
}
280172

173+
/// Creates a new reporter instance.
174+
///
175+
/// This function is supposed to be called from another process than the collector.
176+
/// Therefore, the reporter destination is passed as an environment variable.
177+
/// The reporter destination is the address of the collector service.
178+
pub fn create_reporter() -> Result<Box<dyn Reporter>, ReporterError> {
179+
let address = std::env::var(KEY_DESTINATION)
180+
.map_err(|_| ReporterError::MissingEnvironmentVariable(KEY_DESTINATION.to_string()))?;
181+
let reporter = tcp::ReporterOnTcp::new(address);
182+
Ok(Box::new(reporter))
183+
}
184+
185+
pub fn create_reporter_on_tcp(address: &str) -> Box<dyn Reporter> {
186+
let reporter = tcp::ReporterOnTcp::new(address.to_string());
187+
Box::new(reporter)
188+
}
189+
190+
/// Represents the remote sink of supervised process events.
191+
///
192+
/// This allows the reporters to send events to a remote collector.
193+
pub trait Reporter {
194+
fn report(&self, event: Event) -> Result<(), ReporterError>;
195+
}
196+
197+
/// Represents the local sink of supervised process events.
198+
///
199+
/// The collector is responsible for collecting the events from the reporters.
200+
///
201+
/// To share the collector between threads, we use the `Arc` type to wrap the
202+
/// collector. This way we can clone the collector and send it to other threads.
203+
pub trait Collector {
204+
/// Returns the address of the collector.
205+
///
206+
/// The address is in the format of `ip:port`.
207+
fn address(&self) -> String;
208+
209+
/// Collects the events from the reporters.
210+
///
211+
/// The events are sent to the given destination channel.
212+
///
213+
/// The function returns when the collector is stopped. The collector is stopped
214+
/// when the `stop` method invoked (from another thread).
215+
fn collect(&self, destination: Sender<Event>) -> Result<(), CollectorError>;
216+
217+
/// Stops the collector.
218+
fn stop(&self) -> Result<(), CollectorError>;
219+
}
220+
281221
/// The service is responsible for collecting the events from the supervised processes.
282222
///
283223
/// The service is implemented as a TCP server that listens to on a random port on the loopback
@@ -419,8 +359,8 @@ impl InterceptEnvironment {
419359

420360
let child: Execution = TryInto::<Execution>::try_into(input)?
421361
.with_environment(self.environment().into_iter().collect());
422-
let exit_status =
423-
supervise(child).map_err(|e| InterceptError::ProcessExecution(e.to_string()))?;
362+
let exit_status = supervise::supervise(child)
363+
.map_err(|e| InterceptError::ProcessExecution(e.to_string()))?;
424364
log::info!("Execution finished with status: {:?}", exit_status);
425365

426366
// The exit code is not always available. When the process is killed by a signal,
@@ -526,6 +466,65 @@ impl config::Intercept {
526466
}
527467
}
528468

469+
/// Errors that can occur in the reporter.
470+
#[derive(Error, Debug)]
471+
pub enum ReporterError {
472+
#[error("Environment variable '{0}' is missing")]
473+
MissingEnvironmentVariable(String),
474+
#[error("Network error: {0}")]
475+
Network(String),
476+
#[error("Serialization error: {0}")]
477+
Serialization(#[from] serde_json::Error),
478+
#[error("IO error: {0}")]
479+
Io(#[from] std::io::Error),
480+
}
481+
482+
/// Errors that can occur in the collector.
483+
#[derive(Error, Debug)]
484+
pub enum CollectorError {
485+
#[error("IO error: {0}")]
486+
Io(#[from] std::io::Error),
487+
#[error("Network error: {0}")]
488+
Network(String),
489+
#[error("Serialization error: {0}")]
490+
Serialization(#[from] serde_json::Error),
491+
#[error("Channel communication error: {0}")]
492+
Channel(String),
493+
}
494+
495+
impl<T> From<std::sync::mpsc::SendError<T>> for CollectorError {
496+
fn from(err: std::sync::mpsc::SendError<T>) -> Self {
497+
CollectorError::Channel(format!("Failed to send message: {}", err))
498+
}
499+
}
500+
501+
/// Errors that can occur in the intercept environment and configuration.
502+
#[derive(Error, Debug)]
503+
pub enum InterceptError {
504+
#[error("IO error: {0}")]
505+
Io(#[from] std::io::Error),
506+
#[error("Process execution error: {0}")]
507+
ProcessExecution(String),
508+
#[error("Configuration validation error: {0}")]
509+
ConfigValidation(String),
510+
#[error("Path manipulation error: {0}")]
511+
Path(String),
512+
#[error("Thread join error")]
513+
ThreadJoin,
514+
#[error("No executable found in build command")]
515+
NoExecutable,
516+
#[error("Reporter error: {0}")]
517+
Reporter(#[from] ReporterError),
518+
#[error("Collector error: {0}")]
519+
Collector(#[from] CollectorError),
520+
}
521+
522+
impl From<std::env::JoinPathsError> for InterceptError {
523+
fn from(err: std::env::JoinPathsError) -> Self {
524+
InterceptError::Path(format!("Failed to join paths: {}", err))
525+
}
526+
}
527+
529528
#[cfg(test)]
530529
mod test {
531530
use super::*;

bear/src/intercept/tcp.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,28 +231,27 @@ mod tests {
231231
}
232232

233233
mod fixtures {
234-
use super::super::super::event;
235234
use super::*;
236235
use std::collections::HashMap;
237236

238237
pub(super) static EVENTS: std::sync::LazyLock<Vec<Event>> =
239238
std::sync::LazyLock::new(|| {
240239
vec![
241-
event(
240+
Event::from_strings(
242241
3425,
243242
"/usr/bin/ls",
244243
vec!["ls", "-l"],
245244
"/tmp",
246245
HashMap::new(),
247246
),
248-
event(
247+
Event::from_strings(
249248
3492,
250249
"/usr/bin/cc",
251250
vec!["cc", "-c", "./file_a.c", "-o", "./file_a.o"],
252251
"/home/user",
253252
HashMap::from([("PATH", "/usr/bin:/bin"), ("HOME", "/home/user")]),
254253
),
255-
event(
254+
Event::from_strings(
256255
3522,
257256
"/usr/bin/ld",
258257
vec!["ld", "-o", "./file_a", "./file_a.o"],

0 commit comments

Comments
 (0)