Skip to content

Commit 52e5009

Browse files
committed
rust: intercept module tested to work to transfer events
1 parent 857e7df commit 52e5009

File tree

4 files changed

+137
-88
lines changed

4 files changed

+137
-88
lines changed

rust/intercept/src/collector.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,40 @@
1717
along with this program. If not, see <http://www.gnu.org/licenses/>.
1818
*/
1919

20-
use std::net::{TcpListener, TcpStream};
20+
use std::net::{SocketAddr, TcpListener, TcpStream};
2121

22-
use crossbeam::channel::{Receiver, Sender};
23-
use crossbeam_channel::bounded;
22+
use crossbeam::channel::Sender;
2423
use serde::{Deserialize, Serialize};
24+
use std::sync::atomic::{AtomicBool, Ordering};
25+
use std::sync::Arc;
2526

2627
use super::Envelope;
2728

2829
#[derive(Serialize, Deserialize, Debug, PartialEq)]
2930
pub struct SessionLocator(pub String);
3031

3132
pub trait EventCollector {
32-
fn address(&self) -> Result<SessionLocator, anyhow::Error>;
33+
fn address(&self) -> SessionLocator;
3334
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
3435
fn stop(&self) -> Result<(), anyhow::Error>;
3536
}
3637

3738
pub struct EventCollectorOnTcp {
38-
control_input: Sender<bool>,
39-
control_output: Receiver<bool>,
39+
shutdown: Arc<AtomicBool>,
4040
listener: TcpListener,
41+
address: SocketAddr,
4142
}
4243

4344
impl EventCollectorOnTcp {
4445
pub fn new() -> Result<Self, anyhow::Error> {
45-
let (control_input, control_output) = bounded(0);
46+
let shutdown = Arc::new(AtomicBool::new(false));
4647
let listener = TcpListener::bind("127.0.0.1:0")?;
48+
let address = listener.local_addr()?;
4749

4850
let result = EventCollectorOnTcp {
49-
control_input,
50-
control_output,
51+
shutdown,
5152
listener,
53+
address,
5254
};
5355

5456
Ok(result)
@@ -67,25 +69,20 @@ impl EventCollectorOnTcp {
6769
}
6870

6971
impl EventCollector for EventCollectorOnTcp {
70-
fn address(&self) -> Result<SessionLocator, anyhow::Error> {
71-
let local_addr = self.listener.local_addr()?;
72-
let locator = SessionLocator(local_addr.to_string());
73-
Ok(locator)
72+
fn address(&self) -> SessionLocator {
73+
SessionLocator(self.address.to_string())
7474
}
7575

7676
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error> {
77-
loop {
78-
if let Ok(shutdown) = self.control_output.try_recv() {
79-
if shutdown {
80-
break;
81-
}
77+
for stream in self.listener.incoming() {
78+
if self.shutdown.load(Ordering::Relaxed) {
79+
break;
8280
}
8381

84-
match self.listener.accept() {
85-
Ok((stream, _)) => {
86-
println!("Got a connection");
82+
match stream {
83+
Ok(connection) => {
8784
// ... (process the connection in a separate thread or task)
88-
self.send(stream, destination.clone())?;
85+
self.send(connection, destination.clone())?;
8986
}
9087
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
9188
// No new connection available, continue checking for shutdown
@@ -97,13 +94,12 @@ impl EventCollector for EventCollectorOnTcp {
9794
}
9895
}
9996
}
100-
101-
println!("Server shutting down");
10297
Ok(())
10398
}
10499

105100
fn stop(&self) -> Result<(), anyhow::Error> {
106-
self.control_input.send(true)?;
101+
self.shutdown.store(true, Ordering::Relaxed);
102+
let _ = TcpStream::connect(self.address)?;
107103
Ok(())
108104
}
109105
}

rust/intercept/src/lib.rs

Lines changed: 4 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ pub mod reporter;
3434
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
3535
pub struct ReporterId(pub u64);
3636

37-
#[derive(Serialize, Deserialize, Debug, PartialEq)]
37+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
3838
pub struct ProcessId(pub u32);
3939

40-
#[derive(Serialize, Deserialize, Debug, PartialEq)]
40+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
4141
pub struct Execution {
4242
pub executable: PathBuf,
4343
pub arguments: Vec<String>,
@@ -51,7 +51,7 @@ pub struct Execution {
5151
// terminate), but can be extended later with performance related
5252
// events like monitoring the CPU usage or the memory allocation if
5353
// this information is available.
54-
#[derive(Serialize, Deserialize, Debug, PartialEq)]
54+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
5555
pub enum Event {
5656
Started {
5757
pid: ProcessId,
@@ -66,7 +66,7 @@ pub enum Event {
6666
},
6767
}
6868

69-
#[derive(Serialize, Deserialize, Debug, PartialEq)]
69+
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
7070
pub struct Envelope {
7171
pub rid: ReporterId,
7272
pub timestamp: u64,
@@ -106,55 +106,3 @@ impl Envelope {
106106
Ok(length)
107107
}
108108
}
109-
110-
#[cfg(test)]
111-
mod test {
112-
use super::*;
113-
use lazy_static::lazy_static;
114-
use std::io::Cursor;
115-
116-
#[test]
117-
fn read_write_works() {
118-
let mut writer = Cursor::new(vec![0; 1024]);
119-
for envelope in ENVELOPES.iter() {
120-
let result = Envelope::write_into(envelope, &mut writer);
121-
assert!(result.is_ok());
122-
}
123-
124-
let mut reader = Cursor::new(writer.get_ref());
125-
for envelope in ENVELOPES.iter() {
126-
let result = Envelope::read_from(&mut reader);
127-
assert!(result.is_ok());
128-
assert_eq!(result.unwrap(), *envelope.clone());
129-
}
130-
}
131-
132-
lazy_static! {
133-
static ref ENVELOPES: Vec<Envelope> = vec![
134-
Envelope {
135-
rid: ReporterId(1),
136-
timestamp: 0,
137-
event: Event::Started {
138-
pid: ProcessId(1),
139-
ppid: ProcessId(0),
140-
execution: Execution {
141-
executable: PathBuf::from("/usr/bin/ls"),
142-
arguments: vec!["-l".to_string()],
143-
working_dir: PathBuf::from("/tmp"),
144-
environment: HashMap::new(),
145-
},
146-
},
147-
},
148-
Envelope {
149-
rid: ReporterId(1),
150-
timestamp: 0,
151-
event: Event::Terminated { status: 0 },
152-
},
153-
Envelope {
154-
rid: ReporterId(1),
155-
timestamp: 0,
156-
event: Event::Signaled { signal: 15 },
157-
},
158-
];
159-
}
160-
}

rust/intercept/src/reporter.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,18 @@ impl ReporterId {
3636
// supervisor processes). The events are collected in a common place
3737
// in order to reconstruct of final report of a build process.
3838
pub trait Reporter {
39-
fn report(&mut self, event: Event) -> Result<(), anyhow::Error>;
39+
fn report(&self, event: Event) -> Result<(), anyhow::Error>;
4040
}
4141

42-
struct TcpReporter {
43-
socket: TcpStream,
42+
pub struct TcpReporter {
4443
destination: String,
4544
reporter_id: ReporterId,
4645
}
4746

4847
impl TcpReporter {
4948
pub fn new(destination: String) -> Result<Self, anyhow::Error> {
50-
let socket = TcpStream::connect(destination.clone())?;
5149
let reporter_id = ReporterId::new();
5250
let result = TcpReporter {
53-
socket,
5451
destination,
5552
reporter_id,
5653
};
@@ -59,9 +56,10 @@ impl TcpReporter {
5956
}
6057

6158
impl Reporter for TcpReporter {
62-
fn report(&mut self, event: Event) -> Result<(), anyhow::Error> {
59+
fn report(&self, event: Event) -> Result<(), anyhow::Error> {
6360
let envelope = Envelope::new(&self.reporter_id, event);
64-
envelope.write_into(&mut self.socket)?;
61+
let mut socket = TcpStream::connect(self.destination.clone())?;
62+
envelope.write_into(&mut socket)?;
6563

6664
Ok(())
6765
}

rust/intercept/tests/test.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use intercept::collector::{EventCollector, EventCollectorOnTcp};
2+
use intercept::reporter::{Reporter, TcpReporter};
3+
use intercept::*;
4+
5+
mod test {
6+
use super::*;
7+
use crossbeam_channel::bounded;
8+
use lazy_static::lazy_static;
9+
use std::collections::HashMap;
10+
use std::io::Cursor;
11+
use std::path::PathBuf;
12+
use std::sync::Arc;
13+
use std::thread;
14+
use std::time::Duration;
15+
16+
// Test that the TCP reporter and the TCP collector work together.
17+
// We create a TCP collector and a TCP reporter, then we send events
18+
// to the reporter and check if the collector receives them.
19+
//
20+
// We use a bounded channel to send the events from the reporter to the
21+
// collector. The collector reads the events from the channel and checks
22+
// if they are the same as the original events.
23+
#[test]
24+
fn tcp_reporter_and_collectors_work() {
25+
let collector = EventCollectorOnTcp::new().unwrap();
26+
let reporter = TcpReporter::new(collector.address().0).unwrap();
27+
28+
// Create wrapper to share the collector across threads.
29+
let thread_collector = Arc::new(collector);
30+
let main_collector = thread_collector.clone();
31+
32+
// Start the collector in a separate thread.
33+
let (input, output) = bounded(EVENTS.len());
34+
let receiver_thread = thread::spawn(move || {
35+
thread_collector.collect(input).unwrap();
36+
});
37+
// Send events to the reporter.
38+
for event in EVENTS.iter() {
39+
let result = reporter.report(event.clone());
40+
assert!(result.is_ok());
41+
}
42+
43+
// Call the stop method to stop the collector. This will close the
44+
// channel and the collector will stop reading from it.
45+
thread::sleep(Duration::from_secs(1));
46+
main_collector.stop().unwrap();
47+
48+
// Empty the channel and assert that we received all the events.
49+
let mut count = 0;
50+
for envelope in output.iter() {
51+
assert!(EVENTS.contains(&envelope.event));
52+
count += 1;
53+
}
54+
assert_eq!(count, EVENTS.len());
55+
// shutdown the receiver thread
56+
receiver_thread.join().unwrap();
57+
}
58+
59+
// Test that the serialization and deserialization of the Envelope works.
60+
// We write the Envelope to a buffer and read it back to check if the
61+
// deserialized Envelope is the same as the original one.
62+
#[test]
63+
fn read_write_works() {
64+
let mut writer = Cursor::new(vec![0; 1024]);
65+
for envelope in ENVELOPES.iter() {
66+
let result = Envelope::write_into(envelope, &mut writer);
67+
assert!(result.is_ok());
68+
}
69+
70+
let mut reader = Cursor::new(writer.get_ref());
71+
for envelope in ENVELOPES.iter() {
72+
let result = Envelope::read_from(&mut reader);
73+
assert!(result.is_ok());
74+
assert_eq!(result.unwrap(), envelope.clone());
75+
}
76+
}
77+
78+
lazy_static! {
79+
static ref ENVELOPES: Vec<Envelope> = vec![
80+
Envelope {
81+
rid: ReporterId(1),
82+
timestamp: 0,
83+
event: Event::Started {
84+
pid: ProcessId(1),
85+
ppid: ProcessId(0),
86+
execution: Execution {
87+
executable: PathBuf::from("/usr/bin/ls"),
88+
arguments: vec!["ls".to_string(), "-l".to_string()],
89+
working_dir: PathBuf::from("/tmp"),
90+
environment: HashMap::new(),
91+
},
92+
},
93+
},
94+
Envelope {
95+
rid: ReporterId(1),
96+
timestamp: 0,
97+
event: Event::Terminated { status: 0 },
98+
},
99+
Envelope {
100+
rid: ReporterId(1),
101+
timestamp: 0,
102+
event: Event::Signaled { signal: 15 },
103+
},
104+
];
105+
static ref EVENTS: Vec<Event> = ENVELOPES.iter().map(|e| e.event.clone()).collect();
106+
}
107+
}

0 commit comments

Comments
 (0)