Skip to content

Commit c5b3b3b

Browse files
committed
rust: intercept module review
1 parent ce1d927 commit c5b3b3b

File tree

13 files changed

+445
-431
lines changed

13 files changed

+445
-431
lines changed

rust/bear/src/bin/wrapper.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
extern crate core;
1919

2020
use anyhow::{Context, Result};
21-
use bear::intercept::reporter::{Reporter, TcpReporter};
22-
use bear::intercept::{Event, Execution, ProcessId, KEY_DESTINATION};
21+
use bear::ipc::tcp::ReporterOnTcp;
22+
use bear::ipc::Reporter;
23+
use bear::ipc::{Event, Execution, ProcessId};
24+
use bear::modes::intercept::KEY_DESTINATION;
2325
use std::path::{Path, PathBuf};
2426

2527
/// Implementation of the wrapper process.
@@ -108,7 +110,7 @@ fn report(execution: Execution) -> Result<()> {
108110
std::env::var(KEY_DESTINATION)
109111
.with_context(|| format!("${} is missing from the environment", KEY_DESTINATION))
110112
// Create a new reporter
111-
.and_then(TcpReporter::new)
113+
.and_then(ReporterOnTcp::new)
112114
.with_context(|| "Cannot create TCP execution reporter")
113115
// Report the execution
114116
.and_then(|reporter| reporter.report(event))

rust/bear/src/fixtures.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ pub mod fixtures {
77
($($x:expr),*) => (vec![$($x.to_string()),*]);
88
}
99

10+
#[macro_export]
11+
macro_rules! map_of_strings {
12+
($($k:expr => $v:expr),* $(,)?) => {{
13+
core::convert::From::from([$(($k.to_string(), $v.to_string()),)*])
14+
}};
15+
}
16+
1017
#[macro_export]
1118
macro_rules! vec_of_pathbuf {
1219
($($x:expr),*) => (vec![$(PathBuf::from($x)),*]);

rust/bear/src/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::io::BufReader;
88
use std::path::PathBuf;
99

1010
use super::args;
11-
use super::intercept::Execution;
11+
use super::ipc::Execution;
1212

1313
/// Responsible for reading the build events from the intercept mode.
1414
///

rust/bear/src/intercept/collector.rs

Lines changed: 0 additions & 117 deletions
This file was deleted.

rust/bear/src/intercept/reporter.rs

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 63 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,60 @@
11
// SPDX-License-Identifier: GPL-3.0-or-later
22

3+
//! The module contains the intercept reporting and collecting functionality.
4+
//!
5+
//! When a command execution is intercepted, the interceptor sends the event to the collector.
6+
//! This happens in two different processes, requiring a communication channel between these
7+
//! processes.
8+
//!
9+
//! The module provides abstractions for the reporter and the collector. And it also defines
10+
//! the data structures that are used to represent the events.
11+
12+
use serde::{Deserialize, Serialize};
313
use std::collections::HashMap;
4-
use std::io::{Read, Write};
514
use std::path::PathBuf;
15+
use std::sync::mpsc::Sender;
616

7-
use chrono::Utc;
8-
use rand::random;
9-
use serde::{Deserialize, Serialize};
17+
pub mod tcp;
1018

11-
pub mod collector;
12-
pub mod reporter;
19+
/// Represents the remote sink of supervised process events.
20+
///
21+
/// This allows the reporters to send events to a remote collector.
22+
pub trait Reporter {
23+
fn report(&self, event: Event) -> Result<(), anyhow::Error>;
24+
}
1325

14-
/// Reporter id is a unique identifier for a reporter.
26+
/// Represents the local sink of supervised process events.
1527
///
16-
/// It is used to identify the process that sends the execution report.
17-
/// Because the OS PID is not unique across a single build (PIDs are
18-
/// recycled), we need to use a new unique identifier to identify the process.
19-
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
20-
pub struct ReporterId(pub u64);
28+
/// The collector is responsible for collecting the events from the reporters.
29+
///
30+
/// To share the collector between threads, we use the `Arc` type to wrap the
31+
/// collector. This way we can clone the collector and send it to other threads.
32+
pub trait Collector {
33+
/// Returns the address of the collector.
34+
///
35+
/// The address is in the format of `ip:port`.
36+
fn address(&self) -> String;
2137

22-
impl ReporterId {
23-
pub fn new() -> Self {
24-
let id = random::<u64>();
25-
ReporterId(id)
26-
}
27-
}
38+
/// Collects the events from the reporters.
39+
///
40+
/// The events are sent to the given destination channel.
41+
///
42+
/// The function returns when the collector is stopped. The collector is stopped
43+
/// when the `stop` method invoked (from another thread).
44+
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
2845

29-
/// Process id is a OS identifier for a process.
30-
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
31-
pub struct ProcessId(pub u32);
46+
/// Stops the collector.
47+
fn stop(&self) -> Result<(), anyhow::Error>;
48+
}
3249

33-
/// Execution is a representation of a process execution.
50+
/// Envelope is a wrapper around the event.
3451
///
35-
/// It does not contain information about the outcome of the execution,
36-
/// like the exit code or the duration of the execution. It only contains
37-
/// the information that is necessary to reproduce the execution.
52+
/// It contains the reporter id, the timestamp of the event and the event itself.
3853
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
39-
pub struct Execution {
40-
pub executable: PathBuf,
41-
pub arguments: Vec<String>,
42-
pub working_dir: PathBuf,
43-
pub environment: HashMap<String, String>,
54+
pub struct Envelope {
55+
pub rid: ReporterId,
56+
pub timestamp: u64,
57+
pub event: Event,
4458
}
4559

4660
/// Represent a relevant life cycle event of a process.
@@ -54,58 +68,27 @@ pub struct Event {
5468
pub execution: Execution,
5569
}
5670

57-
/// Envelope is a wrapper around the event.
71+
/// Execution is a representation of a process execution.
5872
///
59-
/// It contains the reporter id, the timestamp of the event and the event itself.
73+
/// It does not contain information about the outcome of the execution,
74+
/// like the exit code or the duration of the execution. It only contains
75+
/// the information that is necessary to reproduce the execution.
6076
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
61-
pub struct Envelope {
62-
pub rid: ReporterId,
63-
pub timestamp: u64,
64-
pub event: Event,
77+
pub struct Execution {
78+
pub executable: PathBuf,
79+
pub arguments: Vec<String>,
80+
pub working_dir: PathBuf,
81+
pub environment: HashMap<String, String>,
6582
}
6683

67-
impl Envelope {
68-
pub fn new(rid: &ReporterId, event: Event) -> Self {
69-
let timestamp = Utc::now().timestamp_millis() as u64;
70-
Envelope {
71-
rid: rid.clone(),
72-
timestamp,
73-
event,
74-
}
75-
}
76-
77-
/// Read an envelope from a reader using TLV format.
78-
///
79-
/// The envelope is serialized using JSON and the length of the JSON
80-
/// is written as a 4 byte big-endian integer before the JSON.
81-
pub fn read_from(reader: &mut impl Read) -> Result<Self, anyhow::Error> {
82-
let mut length_bytes = [0; 4];
83-
reader.read_exact(&mut length_bytes)?;
84-
let length = u32::from_be_bytes(length_bytes) as usize;
85-
86-
let mut buffer = vec![0; length];
87-
reader.read_exact(&mut buffer)?;
88-
let envelope = serde_json::from_slice(buffer.as_ref())?;
89-
90-
Ok(envelope)
91-
}
92-
93-
/// Write an envelope to a writer using TLV format.
94-
///
95-
/// The envelope is serialized using JSON and the length of the JSON
96-
/// is written as a 4 byte big-endian integer before the JSON.
97-
pub fn write_into(&self, writer: &mut impl Write) -> Result<u32, anyhow::Error> {
98-
let serialized_envelope = serde_json::to_string(&self)?;
99-
let bytes = serialized_envelope.into_bytes();
100-
let length = bytes.len() as u32;
101-
102-
writer.write_all(&length.to_be_bytes())?;
103-
writer.write_all(&bytes)?;
104-
105-
Ok(length)
106-
}
107-
}
84+
/// Reporter id is a unique identifier for a reporter.
85+
///
86+
/// It is used to identify the process that sends the execution report.
87+
/// Because the OS PID is not unique across a single build (PIDs are
88+
/// recycled), we need to use a new unique identifier to identify the process.
89+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
90+
pub struct ReporterId(pub u64);
10891

109-
/// Declare the environment variable name for the reporter address.
110-
pub const KEY_DESTINATION: &str = "INTERCEPT_REPORTER_ADDRESS";
111-
pub const KEY_PRELOAD_PATH: &str = "LD_PRELOAD";
92+
/// Process id is a OS identifier for a process.
93+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
94+
pub struct ProcessId(pub u32);

0 commit comments

Comments
 (0)