Skip to content

Commit 22ac8af

Browse files
committed
rust: intercept wrapper does not fail when reporting fails
1 parent e330c51 commit 22ac8af

File tree

5 files changed

+106
-54
lines changed

5 files changed

+106
-54
lines changed

rust/intercept/src/bin/wrapper.rs

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,25 @@
1919

2020
extern crate core;
2121

22-
use std::path::PathBuf;
23-
use anyhow::Result;
22+
use anyhow::{Context, Result};
2423
use intercept::reporter::{Reporter, TcpReporter};
24+
use intercept::KEY_DESTINATION;
25+
use std::path::{Path, PathBuf};
2526

2627
fn main() -> Result<()> {
28+
env_logger::init();
2729
// Find out what is the executable name the execution was started with
2830
let executable = std::env::args().next().unwrap();
31+
log::info!("Executable as called: {:?}", executable);
2932
// Read the PATH variable and find the next executable with the same name
30-
let real_executable = std::env::var("PATH")?
31-
.split(':')
32-
.map(|dir| std::path::Path::new(dir).join(&executable))
33-
.filter(|path| path.exists())
34-
.nth(1)
35-
.ok_or_else(|| anyhow::anyhow!("Cannot find the real executable"))?;
36-
// TODO: ^ This is a very naive way to find the real executable.
37-
// Make sure we don't call ourselves.
33+
let real_executable = next_in_path(&executable)?;
34+
log::info!("Executable to call: {:?}", real_executable);
3835

3936
// Report the execution with the real executable
40-
report_execution(&real_executable);
37+
match into_execution(&real_executable).and_then(report) {
38+
Ok(_) => log::info!("Execution reported"),
39+
Err(e) => log::error!("Execution reporting failed: {}", e),
40+
}
4141

4242
// Execute the real executable with the same arguments
4343
let status = std::process::Command::new(real_executable)
@@ -47,29 +47,44 @@ fn main() -> Result<()> {
4747
std::process::exit(status.code().unwrap_or(1));
4848
}
4949

50-
// TODO: Current error handling is very basic, it just panics on any error.
51-
// More sophisticated error handling can be: logging the error and return.
52-
fn report_execution(path_buf: &PathBuf) {
53-
// Get the reporter address from the environment
54-
let reporter_address = std::env::var(INTERCEPT_REPORTER_ADDRESS)
55-
.expect(format!("${} is not set", INTERCEPT_REPORTER_ADDRESS).as_str());
56-
// Create a new reporter
57-
let reporter = TcpReporter::new(reporter_address)
58-
.expect("Cannot create reporter");
50+
// TODO: This is a very naive way to find the real executable.
51+
// Make sure we don't call ourselves.
52+
fn next_in_path(executable: &String) -> Result<PathBuf> {
53+
let real_executable = std::env::var("PATH")?
54+
.split(':')
55+
.map(|dir| Path::new(dir).join(&executable))
56+
.filter(|path| path.exists())
57+
.nth(1)
58+
.ok_or_else(|| anyhow::anyhow!("Cannot find the real executable"))?;
59+
Ok(real_executable)
60+
}
5961

60-
// Report the execution
61-
let execution = intercept::Event {
62+
fn report(execution: intercept::Execution) -> Result<()> {
63+
let event = intercept::Event {
6264
pid: intercept::ProcessId(std::process::id() as u32),
63-
execution: intercept::Execution {
64-
executable: path_buf.clone(),
65-
arguments: std::env::args().collect(),
66-
working_dir: std::env::current_dir().expect("Cannot get current directory"),
67-
environment: std::env::vars().collect(),
68-
},
65+
execution,
6966
};
70-
reporter.report(execution)
71-
.expect("Cannot report execution");
67+
68+
// Get the reporter address from the environment
69+
std::env::var(KEY_DESTINATION)
70+
.with_context(|| format!("${} is missing from the environment", KEY_DESTINATION))
71+
// Create a new reporter
72+
.and_then(|reporter_address| TcpReporter::new(reporter_address))
73+
.with_context(|| "Cannot create TCP execution reporter")
74+
// Report the execution
75+
.and_then(|reporter| reporter.report(event))
76+
.with_context(|| "Sending execution failed")?;
77+
78+
Ok(())
7279
}
7380

74-
// declare a const string for the INTERCEPT_REPORTER_ADDRESS environment name
75-
const INTERCEPT_REPORTER_ADDRESS: &str = "INTERCEPT_REPORTER_ADDRESS";
81+
fn into_execution(path_buf: &Path) -> Result<intercept::Execution> {
82+
std::env::current_dir()
83+
.with_context(|| "Cannot get current directory")
84+
.map(|working_dir| intercept::Execution {
85+
executable: path_buf.to_path_buf(),
86+
arguments: std::env::args().collect(),
87+
working_dir,
88+
environment: std::env::vars().collect(),
89+
})
90+
}

rust/intercept/src/collector.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,32 @@
2020
use std::net::{SocketAddr, TcpListener, TcpStream};
2121

2222
use crossbeam::channel::Sender;
23-
use serde::{Deserialize, Serialize};
2423
use std::sync::atomic::{AtomicBool, Ordering};
2524
use std::sync::Arc;
2625

2726
use super::Envelope;
2827

29-
#[derive(Serialize, Deserialize, Debug, PartialEq)]
30-
pub struct SessionLocator(pub String);
31-
28+
/// Represents the local sink of supervised process events.
29+
///
30+
/// The collector is responsible for collecting the events from the reporters.
31+
///
32+
/// To share the collector between threads, we use the `Arc` type to wrap the
33+
/// collector. This way we can clone the collector and send it to other threads.
3234
pub trait EventCollector {
33-
fn address(&self) -> SessionLocator;
35+
/// Returns the address of the collector.
36+
///
37+
/// The address is in the format of `ip:port`.
38+
fn address(&self) -> String;
39+
40+
/// Collects the events from the reporters.
41+
///
42+
/// The events are sent to the given destination channel.
43+
///
44+
/// The function returns when the collector is stopped. The collector is stopped
45+
/// when the `stop` method invoked (from another thread).
3446
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
47+
48+
/// Stops the collector.
3549
fn stop(&self) -> Result<(), anyhow::Error>;
3650
}
3751

@@ -42,6 +56,10 @@ pub struct EventCollectorOnTcp {
4256
}
4357

4458
impl EventCollectorOnTcp {
59+
/// Creates a new TCP event collector.
60+
///
61+
/// The collector listens on a random port on the loopback interface.
62+
/// The address of the collector can be obtained by the `address` method.
4563
pub fn new() -> Result<Self, anyhow::Error> {
4664
let shutdown = Arc::new(AtomicBool::new(false));
4765
let listener = TcpListener::bind("127.0.0.1:0")?;
@@ -69,12 +87,18 @@ impl EventCollectorOnTcp {
6987
}
7088

7189
impl EventCollector for EventCollectorOnTcp {
72-
fn address(&self) -> SessionLocator {
73-
SessionLocator(self.address.to_string())
90+
fn address(&self) -> String {
91+
self.address.to_string()
7492
}
7593

94+
/// Single-threaded implementation of the collector.
95+
///
96+
/// The collector listens on the TCP port and accepts incoming connections.
97+
/// When a connection is accepted, the collector reads the events from the
98+
/// connection and sends them to the destination channel.
7699
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error> {
77100
for stream in self.listener.incoming() {
101+
// This has to be the first thing to do, in order to implement the stop method!
78102
if self.shutdown.load(Ordering::Relaxed) {
79103
break;
80104
}
@@ -97,6 +121,11 @@ impl EventCollector for EventCollectorOnTcp {
97121
Ok(())
98122
}
99123

124+
/// Stops the collector by flipping the shutdown flag and connecting to the collector.
125+
///
126+
/// The collector is stopped when the `collect` method sees the shutdown flag.
127+
/// To signal the collector to stop, we connect to the collector to unblock the
128+
/// `accept` call to check the shutdown flag.
100129
fn stop(&self) -> Result<(), anyhow::Error> {
101130
self.shutdown.store(true, Ordering::Relaxed);
102131
let _ = TcpStream::connect(self.address)?;

rust/intercept/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::io::{Read, Write};
2121
use std::path::PathBuf;
2222

2323
use chrono::Utc;
24+
use rand::random;
2425
use serde::{Deserialize, Serialize};
2526

2627
pub mod collector;
@@ -34,6 +35,13 @@ pub mod reporter;
3435
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
3536
pub struct ReporterId(pub u64);
3637

38+
impl ReporterId {
39+
pub fn new() -> Self {
40+
let id = random::<u64>();
41+
ReporterId(id)
42+
}
43+
}
44+
3745
/// Process id is a OS identifier for a process.
3846
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
3947
pub struct ProcessId(pub u32);
@@ -113,3 +121,6 @@ impl Envelope {
113121
Ok(length)
114122
}
115123
}
124+
125+
/// Declare the environment variable name for the reporter address.
126+
pub const KEY_DESTINATION: &str = "INTERCEPT_REPORTER_ADDRESS";

rust/intercept/src/reporter.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,11 @@
1919

2020
use std::net::TcpStream;
2121

22-
use rand::random;
23-
2422
use super::{Envelope, Event, ReporterId};
2523

26-
impl ReporterId {
27-
pub fn new() -> Self {
28-
let id = random::<u64>();
29-
ReporterId(id)
30-
}
31-
}
32-
33-
// Represents the remote sink of supervised process events.
34-
//
35-
// Events from a process execution can be sent from many actors (mostly
36-
// supervisor processes). The events are collected in a common place
37-
// in order to reconstruct of final report of a build process.
24+
/// Represents the remote sink of supervised process events.
25+
///
26+
/// This allows the reporters to send events to a remote collector.
3827
pub trait Reporter {
3928
fn report(&self, event: Event) -> Result<(), anyhow::Error>;
4029
}
@@ -45,6 +34,10 @@ pub struct TcpReporter {
4534
}
4635

4736
impl TcpReporter {
37+
/// Creates a new TCP reporter instance.
38+
///
39+
/// It does not open the TCP connection yet. Stores the destination
40+
/// address and creates a unique reporter id.
4841
pub fn new(destination: String) -> Result<Self, anyhow::Error> {
4942
let reporter_id = ReporterId::new();
5043
let result = TcpReporter {
@@ -56,6 +49,10 @@ impl TcpReporter {
5649
}
5750

5851
impl Reporter for TcpReporter {
52+
/// Sends an event to the remote collector.
53+
///
54+
/// The event is wrapped in an envelope and sent to the remote collector.
55+
/// The TCP connection is opened and closed for each event.
5956
fn report(&self, event: Event) -> Result<(), anyhow::Error> {
6057
let envelope = Envelope::new(&self.reporter_id, event);
6158
let mut socket = TcpStream::connect(self.destination.clone())?;

rust/intercept/tests/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod test {
2323
#[test]
2424
fn tcp_reporter_and_collectors_work() {
2525
let collector = EventCollectorOnTcp::new().unwrap();
26-
let reporter = TcpReporter::new(collector.address().0).unwrap();
26+
let reporter = TcpReporter::new(collector.address()).unwrap();
2727

2828
// Create wrapper to share the collector across threads.
2929
let thread_collector = Arc::new(collector);

0 commit comments

Comments
 (0)