Skip to content

Commit 13e6560

Browse files
committed
rust: intercept service takes responsibility of the receiver end
1 parent 64aa940 commit 13e6560

File tree

2 files changed

+49
-46
lines changed

2 files changed

+49
-46
lines changed

rust/bear/src/modes/intercept.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@ use std::process::{Command, ExitCode};
99
use std::sync::Arc;
1010
use std::{env, thread};
1111

12-
pub(crate) struct InterceptService {
12+
pub(super) struct InterceptService {
1313
collector: Arc<EventCollectorOnTcp>,
14-
receiver: Receiver<Envelope>,
15-
collector_thread: Option<thread::JoinHandle<()>>,
14+
network_thread: Option<thread::JoinHandle<()>>,
15+
output_thread: Option<thread::JoinHandle<()>>,
1616
}
1717

1818
impl InterceptService {
19-
pub fn new() -> anyhow::Result<Self> {
19+
pub fn new<F>(consumer: F) -> anyhow::Result<Self>
20+
where
21+
F: FnOnce(Receiver<Envelope>) -> anyhow::Result<()>,
22+
F: Send + 'static,
23+
{
2024
let collector = EventCollectorOnTcp::new()?;
2125
let collector_arc = Arc::new(collector);
2226
let (sender, receiver) = bounded(32);
@@ -25,18 +29,18 @@ impl InterceptService {
2529
let collector_thread = thread::spawn(move || {
2630
collector_in_thread.collect(sender).unwrap();
2731
});
32+
let receiver_in_thread = receiver.clone();
33+
let output_thread = thread::spawn(move || {
34+
consumer(receiver_in_thread).unwrap();
35+
});
2836

2937
Ok(InterceptService {
3038
collector: collector_arc,
31-
receiver,
32-
collector_thread: Some(collector_thread),
39+
network_thread: Some(collector_thread),
40+
output_thread: Some(output_thread),
3341
})
3442
}
3543

36-
pub fn receiver(&self) -> Receiver<Envelope> {
37-
self.receiver.clone()
38-
}
39-
4044
pub fn address(&self) -> String {
4145
self.collector.address()
4246
}
@@ -45,13 +49,16 @@ impl InterceptService {
4549
impl Drop for InterceptService {
4650
fn drop(&mut self) {
4751
self.collector.stop().expect("Failed to stop the collector");
48-
if let Some(thread) = self.collector_thread.take() {
52+
if let Some(thread) = self.network_thread.take() {
4953
thread.join().expect("Failed to join the collector thread");
5054
}
55+
if let Some(thread) = self.output_thread.take() {
56+
thread.join().expect("Failed to join the output thread");
57+
}
5158
}
5259
}
5360

54-
pub(crate) enum InterceptEnvironment {
61+
pub(super) enum InterceptEnvironment {
5562
Wrapper {
5663
bin_dir: tempfile::TempDir,
5764
address: String,

rust/bear/src/modes/mod.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ pub mod recognition;
55
pub mod transformation;
66

77
use crate::input::EventFileReader;
8+
use crate::intercept::Envelope;
89
use crate::output::OutputWriter;
910
use crate::{args, config};
1011
use anyhow::Context;
12+
use crossbeam_channel::Receiver;
1113
use intercept::{InterceptEnvironment, InterceptService};
1214
use recognition::Recognition;
1315
use std::io::BufWriter;
1416
use std::process::ExitCode;
15-
use std::thread;
1617
use transformation::Transformation;
1718

1819
/// The mode trait is used to run the application in different modes.
@@ -56,43 +57,38 @@ impl Intercept {
5657
config,
5758
}
5859
}
60+
61+
fn write_to_file(
62+
output_file_name: String,
63+
envelopes: Receiver<Envelope>,
64+
) -> anyhow::Result<()> {
65+
let mut writer = std::fs::File::create(&output_file_name)
66+
.map(BufWriter::new)
67+
.with_context(|| format!("Failed to create output file: {:?}", &output_file_name))?;
68+
for envelope in envelopes.iter() {
69+
envelope
70+
.write_into(&mut writer)
71+
.with_context(|| "Failed to write the envelope")?;
72+
}
73+
Ok(())
74+
}
5975
}
6076

6177
impl Mode for Intercept {
6278
fn run(self) -> anyhow::Result<ExitCode> {
63-
match &self.config {
64-
config::Intercept::Wrapper { .. } => {
65-
let service = InterceptService::new()
66-
.with_context(|| "Failed to create the intercept service")?;
67-
let environment = InterceptEnvironment::new(&self.config, service.address())
68-
.with_context(|| "Failed to create the intercept environment")?;
69-
70-
// start writer thread
71-
let writer_thread = thread::spawn(move || {
72-
let file = std::fs::File::create(&self.output.file_name).expect(
73-
format!("Failed to create output file: {:?}", self.output.file_name)
74-
.as_str(),
75-
);
76-
let mut writer = BufWriter::new(file);
77-
for envelope in service.receiver().iter() {
78-
envelope
79-
.write_into(&mut writer)
80-
.expect("Failed to write the envelope");
81-
}
82-
});
83-
84-
let status = environment.execute_build_command(self.command);
85-
86-
writer_thread
87-
.join()
88-
.expect("Failed to join the writer thread");
89-
90-
status
91-
}
92-
config::Intercept::Preload { .. } => {
93-
todo!()
94-
}
95-
}
79+
let output_file_name = self.output.file_name.clone();
80+
let service = InterceptService::new(move |envelopes| {
81+
Self::write_to_file(output_file_name, envelopes)
82+
})
83+
.with_context(|| "Failed to create the intercept service")?;
84+
let environment = InterceptEnvironment::new(&self.config, service.address())
85+
.with_context(|| "Failed to create the intercept environment")?;
86+
87+
let status = environment
88+
.execute_build_command(self.command)
89+
.with_context(|| "Failed to execute the build command")?;
90+
91+
Ok(status)
9692
}
9793
}
9894

0 commit comments

Comments
 (0)