Skip to content

Commit c1b79e7

Browse files
Rajneesh Lakkundifacebook-github-bot
Rajneesh Lakkundi
authored andcommitted
Introduce channel between subscriber and client
Summary: Rather than making direct calls to HealthCheckClient, send the events from HealthCheckSubscriber over a channel. The channel is bound to 100 events. If the client is unable to keep up, we drop the channel and client and stop the health checks. There are further safety mechanism in further diffs. Reviewed By: JakobDegen Differential Revision: D73570880 fbshipit-source-id: 595b83b83fa340b8ab23fa3ddb9008568c22c61c
1 parent 2c77ef6 commit c1b79e7

File tree

4 files changed

+420
-93
lines changed

4 files changed

+420
-93
lines changed

app/buck2_client_ctx/src/subscribers/health_check_subscriber.rs

Lines changed: 205 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,31 @@
77
* of this source tree.
88
*/
99

10-
#![allow(dead_code)] // TODO(rajneeshl): Remove this when we start forwarding events to client
10+
#![allow(dead_code)]
11+
// TODO(rajneeshl): Remove this when we start forwarding events to client
1112
use std::sync::Arc;
1213

1314
use async_trait::async_trait;
15+
use buck2_core::soft_error;
1416
use buck2_data::buck_event::Data::*;
1517
use buck2_error::BuckErrorContext;
1618
use buck2_events::BuckEvent;
1719
use buck2_health_check::health_check_client::HealthCheckClient;
20+
use buck2_health_check::health_check_client::StreamingHealthCheckClient;
21+
use buck2_health_check::interface::HealthCheckContextEvent;
22+
use buck2_health_check::interface::HealthCheckEvent;
1823
use buck2_health_check::report::DisplayReport;
1924
use tokio::sync::mpsc::Sender;
25+
use tokio::sync::mpsc::error::TrySendError;
2026

2127
use crate::subscribers::subscriber::EventSubscriber;
2228

29+
const EVENT_CHANNEL_SIZE: usize = 100;
30+
2331
/// This subscriber is responsible for forwarding events to the health check client
2432
pub struct HealthCheckSubscriber {
25-
health_check_client: HealthCheckClient,
33+
health_check_client: Option<Box<dyn HealthCheckClient>>,
34+
event_sender: Option<Sender<HealthCheckEvent>>,
2635
}
2736

2837
#[async_trait]
@@ -39,27 +48,36 @@ impl HealthCheckSubscriber {
3948
pub fn new(
4049
tags_sender: Sender<Vec<String>>,
4150
display_reports_sender: Sender<Vec<DisplayReport>>,
51+
) -> Box<Self> {
52+
let (tx, rx) = tokio::sync::mpsc::channel(EVENT_CHANNEL_SIZE);
53+
let client =
54+
StreamingHealthCheckClient::new(Some(tags_sender), Some(display_reports_sender), rx);
55+
Self::new_with_client(Some(Box::new(client)), tx)
56+
}
57+
58+
fn new_with_client(
59+
health_check_client: Option<Box<dyn HealthCheckClient>>,
60+
events_tx: Sender<HealthCheckEvent>,
4261
) -> Box<Self> {
4362
Box::new(Self {
44-
health_check_client: HealthCheckClient::new(
45-
Some(tags_sender),
46-
Some(display_reports_sender),
47-
),
63+
health_check_client,
64+
event_sender: Some(events_tx),
4865
})
4966
}
5067

5168
async fn handle_event(&mut self, event: &Arc<BuckEvent>) -> buck2_error::Result<()> {
52-
// Capture all errors in the subscribers and don't propagate them any further.
53-
// Health check failures should not affect the command in any way.
69+
if self.event_sender.is_none() || self.health_check_client.is_none() {
70+
return Ok(());
71+
}
5472

55-
let _res = match event.data() {
73+
let health_check_event = match event.data() {
5674
SpanStart(start) => match &start.data {
5775
Some(buck2_data::span_start_event::Data::Command(command)) => {
58-
self.health_check_client
59-
.update_command_data(command.clone())
60-
.await
76+
Some(HealthCheckEvent::HealthCheckContextEvent(
77+
HealthCheckContextEvent::CommandStart(command.clone()),
78+
))
6179
}
62-
_ => Ok(()),
80+
_ => None,
6381
},
6482
SpanEnd(end) => {
6583
use buck2_data::span_end_event::Data::*;
@@ -68,25 +86,30 @@ impl HealthCheckSubscriber {
6886
.as_ref()
6987
.buck_error_context("Missing `data` in SpanEnd")?
7088
{
71-
FileWatcher(file_watcher) => {
72-
if let Some(merge_base) = file_watcher
73-
.stats
89+
FileWatcher(file_watcher) => file_watcher
90+
.stats
91+
.as_ref()
92+
.and_then(|stats| stats.branched_from_revision.as_ref())
93+
.map(|merge_base| {
94+
HealthCheckEvent::HealthCheckContextEvent(
95+
HealthCheckContextEvent::BranchedFromRevision(merge_base.clone()),
96+
)
97+
}),
98+
ActionExecution(action_execution_end) => {
99+
let has_excess_cache_miss = action_execution_end
100+
.invalidation_info
74101
.as_ref()
75-
.and_then(|stats| stats.branched_from_revision.as_ref())
76-
{
77-
self.health_check_client
78-
.update_branched_from_revision(&merge_base)
79-
.await
102+
.is_some_and(|v| v.changed_file.is_none());
103+
104+
if has_excess_cache_miss {
105+
Some(HealthCheckEvent::HealthCheckContextEvent(
106+
HealthCheckContextEvent::HasExcessCacheMisses(),
107+
))
80108
} else {
81-
Ok(())
109+
None
82110
}
83111
}
84-
ActionExecution(action_execution_end) => {
85-
self.health_check_client
86-
.update_excess_cache_misses(action_execution_end)
87-
.await
88-
}
89-
_ => Ok(()),
112+
_ => None,
90113
}
91114
}
92115
Instant(instant) => {
@@ -96,23 +119,166 @@ impl HealthCheckSubscriber {
96119
.as_ref()
97120
.buck_error_context("Missing `data` in `Instant`")?
98121
{
99-
SystemInfo(system_info) => {
100-
self.health_check_client
101-
.update_experiment_configurations(system_info.clone())
102-
.await
103-
}
122+
SystemInfo(system_info) => Some(HealthCheckEvent::HealthCheckContextEvent(
123+
HealthCheckContextEvent::ExperimentConfigurations(system_info.clone()),
124+
)),
104125
TargetPatterns(target_patterns) => {
105-
self.health_check_client
106-
.update_parsed_target_patterns(target_patterns.clone())
107-
.await
126+
Some(HealthCheckEvent::HealthCheckContextEvent(
127+
HealthCheckContextEvent::ParsedTargetPatterns(target_patterns.clone()),
128+
))
108129
}
109-
Snapshot(snapshot) => self.health_check_client.run_checks(snapshot).await,
110-
_ => Ok(()),
130+
Snapshot(_) => Some(HealthCheckEvent::Snapshot()),
131+
_ => None,
111132
}
112133
}
113-
_ => Ok(()),
134+
_ => None,
114135
};
115136

137+
if let (Some(health_check_event), Some(event_sender)) =
138+
(health_check_event, &mut self.event_sender)
139+
{
140+
match event_sender.try_send(health_check_event) {
141+
Ok(_) => {}
142+
Err(TrySendError::Full(_)) => {
143+
self.close_client_connection_with_error_report(
144+
"Health check event channel full. Disabling health checks.",
145+
);
146+
}
147+
Err(TrySendError::Closed(_)) => {
148+
self.close_client_connection_with_error_report(
149+
"Health check event receiver closed. Disabling health checks.",
150+
);
151+
}
152+
}
153+
}
154+
Ok(())
155+
}
156+
157+
pub fn close_client_connection_with_error_report(&mut self, error: &str) {
158+
self.event_sender.take();
159+
self.health_check_client.take();
160+
let _ignored = soft_error!(
161+
"health_check_subscriber_error",
162+
buck2_error::buck2_error!(buck2_error::ErrorTag::HealthCheck, "{}", error)
163+
);
164+
}
165+
}
166+
167+
#[cfg(test)]
168+
mod tests {
169+
use std::time::SystemTime;
170+
171+
use buck2_health_check::interface::HealthCheckType;
172+
use buck2_health_check::report::DisplayReport;
173+
use buck2_health_check::report::HealthIssue;
174+
use buck2_health_check::report::Severity;
175+
use buck2_wrapper_common::invocation_id::TraceId;
176+
use tokio::sync::mpsc::Receiver;
177+
use tokio::sync::mpsc::{self};
178+
use tokio::task::JoinHandle;
179+
180+
use super::*;
181+
182+
struct TestHealthCheckClient {
183+
handle: JoinHandle<()>,
184+
}
185+
186+
impl TestHealthCheckClient {
187+
fn new(
188+
tags_tx: Sender<Vec<String>>,
189+
display_reports_tx: Sender<Vec<DisplayReport>>,
190+
mut event_rx: Receiver<HealthCheckEvent>,
191+
) -> Self {
192+
let handle = tokio::spawn(async move {
193+
while let Some(event) = event_rx.recv().await {
194+
match event {
195+
HealthCheckEvent::Snapshot() => {
196+
// Send test tags
197+
let _unused = tags_tx
198+
.send(vec!["test_tag1".to_owned(), "test_tag2".to_owned()])
199+
.await;
200+
201+
let _unused = display_reports_tx.send(test_reports()).await;
202+
}
203+
HealthCheckEvent::HealthCheckContextEvent(_) => {
204+
// Process context events if needed for tests
205+
}
206+
}
207+
}
208+
});
209+
Self { handle }
210+
}
211+
}
212+
213+
impl HealthCheckClient for TestHealthCheckClient {}
214+
215+
fn test_reports() -> Vec<DisplayReport> {
216+
vec![
217+
DisplayReport {
218+
health_check_type: HealthCheckType::StableRevision,
219+
health_issue: Some(HealthIssue {
220+
severity: Severity::Warning,
221+
message: "Test report 1".to_owned(),
222+
remediation: None,
223+
}),
224+
},
225+
DisplayReport {
226+
health_check_type: HealthCheckType::LowDiskSpace,
227+
health_issue: Some(HealthIssue {
228+
severity: Severity::Info,
229+
message: "Test report 2".to_owned(),
230+
remediation: None,
231+
}),
232+
},
233+
]
234+
}
235+
236+
fn test_event(data: buck2_data::buck_event::Data) -> Arc<BuckEvent> {
237+
Arc::new(BuckEvent::new(
238+
SystemTime::now(),
239+
TraceId::new(),
240+
None,
241+
None,
242+
data,
243+
))
244+
}
245+
246+
#[tokio::test]
247+
async fn test_health_check_subscriber() -> buck2_error::Result<()> {
248+
let (tags_tx, mut tags_rx) = mpsc::channel::<Vec<String>>(10);
249+
let (reports_tx, mut reports_rx) = mpsc::channel::<Vec<DisplayReport>>(10);
250+
let (events_tx, events_rx) = mpsc::channel::<HealthCheckEvent>(EVENT_CHANNEL_SIZE);
251+
252+
// Create client and subscriber
253+
let client = TestHealthCheckClient::new(tags_tx, reports_tx, events_rx);
254+
let mut subscriber =
255+
HealthCheckSubscriber::new_with_client(Some(Box::new(client)), events_tx);
256+
257+
// Create a snapshot event
258+
let event = test_event(
259+
buck2_data::InstantEvent {
260+
data: Some(Box::new(buck2_data::Snapshot::default()).into()),
261+
}
262+
.into(),
263+
);
264+
265+
// Send the event
266+
subscriber.handle_event(&event).await?;
267+
268+
// Verify tags were received
269+
let tags =
270+
tokio::time::timeout(std::time::Duration::from_millis(100), tags_rx.recv()).await?;
271+
assert_eq!(tags.unwrap(), vec!["test_tag1", "test_tag2"]);
272+
273+
// Verify reports were received
274+
let reports =
275+
tokio::time::timeout(std::time::Duration::from_millis(100), reports_rx.recv()).await?;
276+
assert!(reports.is_some());
277+
let reports = reports.unwrap();
278+
assert_eq!(reports.len(), 2);
279+
assert!(reports[0].health_issue.is_some());
280+
assert!(reports[1].health_issue.is_some());
281+
116282
Ok(())
117283
}
118284
}

0 commit comments

Comments
 (0)