Skip to content

Commit 252fc7d

Browse files
feat(reflectors): fetch data from a single namespace (#1417)
1 parent edcc484 commit 252fc7d

File tree

18 files changed

+322
-258
lines changed

18 files changed

+322
-258
lines changed

agent-control/src/agent_control/config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,27 @@ pub fn helmrepository_type_meta() -> TypeMeta {
233233
}
234234
}
235235

236+
pub fn statefulset_type_meta() -> TypeMeta {
237+
TypeMeta {
238+
api_version: "apps/v1".to_string(),
239+
kind: "StatefulSet".to_string(),
240+
}
241+
}
242+
243+
pub fn daemonset_type_meta() -> TypeMeta {
244+
TypeMeta {
245+
api_version: "apps/v1".to_string(),
246+
kind: "DaemonSet".to_string(),
247+
}
248+
}
249+
250+
pub fn deployment_type_meta() -> TypeMeta {
251+
TypeMeta {
252+
api_version: "apps/v1".to_string(),
253+
kind: "Deployment".to_string(),
254+
}
255+
}
256+
236257
pub fn default_group_version_kinds() -> Vec<TypeMeta> {
237258
// In flux health check we are currently supporting just a single helm_release_type_meta
238259
// Each time we support a new version we should decide if and how to support retrieving its health

agent-control/src/agent_control/health_checker/k8s.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub fn agent_control_health_checker_builder(
2020
k8s_client.clone(),
2121
RELEASE_NAME.to_string(),
2222
namespace.clone(),
23+
Some(namespace.clone()),
2324
start_time,
2425
),
2526
start_time,

agent-control/src/agent_control/resource_cleaner/k8s_garbage_collector.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub struct K8sGarbageCollector {
2929
pub k8s_client: Arc<SyncK8sClient>,
3030
/// The namespace where the Agent Control stores data via configMaps.
3131
pub namespace: String,
32+
/// The namespace where agents are running. We are garbage collecting resources here only due to Instrumentation
33+
pub namespace_agents: String,
3234
pub cr_type_meta: Vec<TypeMeta>,
3335
}
3436

@@ -42,7 +44,8 @@ impl K8sGarbageCollector {
4244
) -> Result<(), K8sGarbageCollectorError> {
4345
let mode = K8sGarbageCollectorMode::RetainConfig(&active_agents);
4446
self.garbage_collection_config_maps(&mode)?;
45-
self.garbage_collection_dynamic_object(&mode)
47+
self.garbage_collection_dynamic_object(&mode, &self.namespace_agents)?;
48+
self.garbage_collection_dynamic_object(&mode, &self.namespace)
4649
}
4750

4851
/// Garbage collect resources managed by AC associated to a certain
@@ -60,7 +63,8 @@ impl K8sGarbageCollector {
6063

6164
let mode = K8sGarbageCollectorMode::Collect(id, agent_type_id);
6265
self.garbage_collection_config_maps(&mode)?;
63-
self.garbage_collection_dynamic_object(&mode)
66+
self.garbage_collection_dynamic_object(&mode, &self.namespace_agents)?;
67+
self.garbage_collection_dynamic_object(&mode, &self.namespace)
6468
}
6569

6670
pub fn active_config_ids(active_config: &SubAgentsMap) -> HashMap<AgentID, AgentTypeID> {
@@ -86,10 +90,11 @@ impl K8sGarbageCollector {
8690
fn garbage_collection_dynamic_object(
8791
&self,
8892
mode: &K8sGarbageCollectorMode,
93+
namespace: &str,
8994
) -> Result<(), K8sGarbageCollectorError> {
9095
// Delete dynamic resources depending on mode
9196
self.cr_type_meta.iter().try_for_each(|tm| {
92-
match self.k8s_client.list_dynamic_objects_in_all_namespaces(tm) {
97+
match self.k8s_client.list_dynamic_objects(tm, namespace) {
9398
Ok(dyn_objs) => {
9499
dyn_objs
95100
.into_iter()
@@ -261,21 +266,21 @@ mod tests {
261266
use mockall::predicate;
262267

263268
const TEST_NAMESPACE: &str = "test-namespace";
269+
const TEST_NAMESPACE_AGENTS: &str = "test-namespace-agents";
264270

265271
#[test]
266272
fn errors_if_ac_id() {
267273
let mut k8s_client = SyncK8sClient::default();
268274
// collect should return immediately on AC ID, and return with an error
269275
k8s_client.expect_delete_configmap_collection().never();
270-
k8s_client
271-
.expect_list_dynamic_objects_in_all_namespaces()
272-
.never();
276+
k8s_client.expect_list_dynamic_objects().never();
273277
k8s_client.expect_delete_dynamic_object().never();
274278

275279
let garbage_collector = K8sGarbageCollector {
276280
k8s_client: Arc::new(k8s_client),
277281
cr_type_meta: vec![],
278282
namespace: TEST_NAMESPACE.to_string(),
283+
namespace_agents: TEST_NAMESPACE_AGENTS.to_string(),
279284
};
280285
let ac_id = &AgentID::new_agent_control_id();
281286
let ac_type_id =
@@ -297,18 +302,29 @@ mod tests {
297302
.once()
298303
.with(predicate::eq(TEST_NAMESPACE), predicate::eq("app.kubernetes.io/managed-by==newrelic-agent-control, newrelic.io/agent-id in (foo-agent)"))
299304
.returning(|_, _| Ok(()));
300-
301305
k8s_client
302-
.expect_list_dynamic_objects_in_all_namespaces()
306+
.expect_list_dynamic_objects()
307+
.once()
308+
.with(
309+
predicate::eq(type_meta.clone()),
310+
predicate::eq(TEST_NAMESPACE_AGENTS),
311+
)
312+
.returning(|_, _| Ok(vec![]));
313+
k8s_client
314+
.expect_list_dynamic_objects()
303315
.once()
304-
.with(predicate::eq(type_meta.clone()))
305-
.returning(|_| Ok(vec![]));
316+
.with(
317+
predicate::eq(type_meta.clone()),
318+
predicate::eq(TEST_NAMESPACE),
319+
)
320+
.returning(|_, _| Ok(vec![]));
306321
k8s_client.expect_delete_dynamic_object().never();
307322

308323
let garbage_collector = K8sGarbageCollector {
309324
k8s_client: Arc::new(k8s_client),
310325
cr_type_meta: vec![type_meta],
311326
namespace: TEST_NAMESPACE.to_string(),
327+
namespace_agents: TEST_NAMESPACE_AGENTS.to_string(),
312328
};
313329
let ac_id = &AgentID::new("foo-agent").unwrap();
314330
let agent_type_id = &AgentTypeID::try_from("newrelic/com.example.foo:0.0.1").unwrap();

agent-control/src/agent_control/run/k8s.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl AgentControlRunner {
163163
let garbage_collector = K8sGarbageCollector {
164164
k8s_client: k8s_client.clone(),
165165
namespace: self.k8s_config.namespace.clone(),
166+
namespace_agents: self.k8s_config.namespace_agents.clone(),
166167
cr_type_meta: self.k8s_config.cr_type_meta,
167168
};
168169

agent-control/src/health/k8s/health_checker.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::health::health_checker::{HealthChecker, HealthCheckerError, Healthy};
33
use crate::health::with_start_time::{HealthWithStartTime, StartTime};
44
#[cfg_attr(test, mockall_double::double)]
55
use crate::k8s::client::SyncK8sClient;
6-
use crate::k8s::utils::{get_name, get_namespace, get_type_meta};
6+
use crate::k8s::utils::{get_name, get_namespace, get_target_namespace, get_type_meta};
77
use kube::api::{DynamicObject, TypeMeta};
88
use resources::{
99
daemon_set::K8sHealthDaemonSet, deployment::K8sHealthDeployment,
@@ -49,32 +49,38 @@ pub fn health_checkers_for_type_meta(
4949
k8s_client: Arc<SyncK8sClient>,
5050
name: String,
5151
namespace: String,
52+
target_namespace: Option<String>,
5253
start_time: StartTime,
5354
) -> Vec<K8sResourceHealthChecker> {
5455
// HelmRelease (Flux CR)
5556
if type_meta == helmrelease_v2_type_meta() {
57+
let target_namespace = target_namespace.unwrap_or(namespace.clone());
58+
5659
vec![
5760
K8sResourceHealthChecker::Flux(K8sHealthFluxHelmRelease::new(
5861
k8s_client.clone(),
5962
type_meta,
6063
name.clone(),
61-
namespace,
64+
namespace.clone(),
6265
start_time,
6366
)),
6467
K8sResourceHealthChecker::StatefulSet(K8sHealthStatefulSet::new(
6568
k8s_client.clone(),
6669
name.clone(),
6770
start_time,
71+
target_namespace.clone(),
6872
)),
6973
K8sResourceHealthChecker::DaemonSet(K8sHealthDaemonSet::new(
7074
k8s_client.clone(),
7175
name.clone(),
7276
start_time,
77+
target_namespace.clone(),
7378
)),
7479
K8sResourceHealthChecker::Deployment(K8sHealthDeployment::new(
7580
k8s_client.clone(),
7681
name,
7782
start_time,
83+
target_namespace,
7884
)),
7985
]
8086
// Instrumentation (Newrelic CR)
@@ -111,12 +117,14 @@ impl K8sHealthChecker<K8sResourceHealthChecker> {
111117

112118
let name = get_name(resource)?;
113119
let namespace = get_namespace(resource)?;
120+
let target_namespace = get_target_namespace(resource);
114121

115122
let resource_health_checkers = health_checkers_for_type_meta(
116123
type_meta,
117124
k8s_client.clone(),
118125
name,
119126
namespace,
127+
target_namespace,
120128
start_time,
121129
);
122130

agent-control/src/health/k8s/health_checker/resources/daemon_set.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ pub struct K8sHealthDaemonSet {
1717
k8s_client: Arc<SyncK8sClient>,
1818
release_name: String,
1919
start_time: StartTime,
20+
namespace: String,
2021
}
2122

2223
impl HealthChecker for K8sHealthDaemonSet {
2324
fn check_health(&self) -> Result<HealthWithStartTime, HealthCheckerError> {
24-
let daemon_sets = self.k8s_client.list_daemon_set();
25+
let daemon_sets = self.k8s_client.list_daemon_set(&self.namespace)?;
2526

2627
let target_daemon_sets = daemon_sets
2728
.into_iter()
@@ -38,11 +39,13 @@ impl K8sHealthDaemonSet {
3839
k8s_client: Arc<SyncK8sClient>,
3940
release_name: String,
4041
start_time: StartTime,
42+
namespace: String,
4143
) -> Self {
4244
Self {
4345
k8s_client,
4446
release_name,
4547
start_time,
48+
namespace,
4649
}
4750
}
4851

@@ -139,6 +142,7 @@ pub mod tests {
139142
api::apps::v1::{DaemonSetSpec, DaemonSetStatus},
140143
apimachinery::pkg::apis::meta::v1::ObjectMeta,
141144
};
145+
pub const TEST_NAMESPACE: &str = "test-namespace";
142146

143147
const TEST_DAEMON_SET_NAME: &str = "test";
144148

@@ -415,17 +419,21 @@ pub mod tests {
415419
k8s_client
416420
.expect_list_daemon_set()
417421
.times(1)
418-
.returning(move || {
419-
vec![
422+
.returning(move |_| {
423+
Ok(vec![
420424
Arc::new(healthy_matching.clone()),
421425
Arc::new(unhealthy_matching.clone()),
422-
]
426+
])
423427
});
424428

425429
let start_time = StartTime::now();
426430

427-
let health_checker =
428-
K8sHealthDaemonSet::new(Arc::new(k8s_client), release_name.to_string(), start_time);
431+
let health_checker = K8sHealthDaemonSet::new(
432+
Arc::new(k8s_client),
433+
release_name.to_string(),
434+
start_time,
435+
TEST_NAMESPACE.to_string(),
436+
);
429437
let health = health_checker.check_health().unwrap();
430438

431439
assert_eq!(

agent-control/src/health/k8s/health_checker/resources/deployment.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ pub struct K8sHealthDeployment {
1414
k8s_client: Arc<SyncK8sClient>,
1515
release_name: String,
1616
start_time: StartTime,
17+
namespace: String,
1718
}
1819

1920
impl HealthChecker for K8sHealthDeployment {
2021
fn check_health(&self) -> Result<HealthWithStartTime, HealthCheckerError> {
21-
let deployments = self.k8s_client.list_deployment();
22+
let deployments = self.k8s_client.list_deployment(&self.namespace)?;
2223

2324
let target_deployments = deployments
2425
.into_iter()
@@ -35,11 +36,13 @@ impl K8sHealthDeployment {
3536
k8s_client: Arc<SyncK8sClient>,
3637
release_name: String,
3738
start_time: StartTime,
39+
namespace: String,
3840
) -> Self {
3941
Self {
4042
k8s_client,
4143
release_name,
4244
start_time,
45+
namespace,
4346
}
4447
}
4548

@@ -87,6 +90,7 @@ impl K8sHealthDeployment {
8790
mod tests {
8891
use super::*;
8992
use crate::health::health_checker::Healthy;
93+
use crate::health::k8s::health_checker::resources::daemon_set::tests::TEST_NAMESPACE;
9094
use crate::{health::k8s::health_checker::LABEL_RELEASE_FLUX, k8s::client::MockSyncK8sClient};
9195
use k8s_openapi::api::apps::v1::{
9296
Deployment, DeploymentSpec, DeploymentStatus, DeploymentStrategy, RollingUpdateDeployment,
@@ -369,13 +373,14 @@ mod tests {
369373
k8s_client
370374
.expect_list_deployment()
371375
.times(1)
372-
.returning(move || self.deployments.clone());
376+
.returning(move |_| Ok(self.deployments.clone()));
373377

374378
let start_time = StartTime::now();
375379
let health_checker = K8sHealthDeployment::new(
376380
Arc::new(k8s_client),
377381
"flux-release".to_string(),
378382
start_time,
383+
TEST_NAMESPACE.to_string(),
379384
);
380385
let health = health_checker.check_health().unwrap_or_else(|_| {
381386
panic!("Unexpected error getting health for test - {}", self.name)

agent-control/src/health/k8s/health_checker/resources/stateful_set.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ pub struct K8sHealthStatefulSet {
1616
k8s_client: Arc<SyncK8sClient>,
1717
release_name: String,
1818
start_time: StartTime,
19+
namespace: String,
1920
}
2021

2122
impl HealthChecker for K8sHealthStatefulSet {
2223
fn check_health(&self) -> Result<HealthWithStartTime, HealthCheckerError> {
23-
let stateful_sets = self.k8s_client.list_stateful_set();
24+
let stateful_sets = self.k8s_client.list_stateful_set(&self.namespace)?;
2425

2526
let target_stateful_sets = stateful_sets
2627
.into_iter()
@@ -37,11 +38,13 @@ impl K8sHealthStatefulSet {
3738
k8s_client: Arc<SyncK8sClient>,
3839
release_name: String,
3940
start_time: StartTime,
41+
namespace: String,
4042
) -> Self {
4143
Self {
4244
k8s_client,
4345
release_name,
4446
start_time,
47+
namespace,
4548
}
4649
}
4750

@@ -79,6 +82,7 @@ impl K8sHealthStatefulSet {
7982
mod tests {
8083
use super::*;
8184
use crate::health::health_checker::Healthy;
85+
use crate::health::k8s::health_checker::resources::daemon_set::tests::TEST_NAMESPACE;
8286
use crate::{health::k8s::health_checker::LABEL_RELEASE_FLUX, k8s::client::MockSyncK8sClient};
8387
use assert_matches::assert_matches;
8488
use k8s_openapi::api::apps::v1::{StatefulSetSpec, StatefulSetStatus};
@@ -278,17 +282,21 @@ mod tests {
278282
k8s_client
279283
.expect_list_stateful_set()
280284
.times(1)
281-
.returning(move || {
282-
vec![
285+
.returning(move |_| {
286+
Ok(vec![
283287
Arc::new(with_err_not_matching.clone()),
284288
Arc::new(healthy_matching.clone()),
285-
]
289+
])
286290
});
287291

288292
let start_time = StartTime::now();
289293

290-
let health_checker =
291-
K8sHealthStatefulSet::new(Arc::new(k8s_client), release_name.to_string(), start_time);
294+
let health_checker = K8sHealthStatefulSet::new(
295+
Arc::new(k8s_client),
296+
release_name.to_string(),
297+
start_time,
298+
TEST_NAMESPACE.to_string(),
299+
);
292300
let result = health_checker.check_health().unwrap();
293301
assert_eq!(
294302
result,

agent-control/src/k8s.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ pub mod client;
44
mod dynamic_object;
55
pub mod error;
66
pub mod labels;
7-
pub mod reflector;
7+
pub mod reflectors;
88
pub mod store;
99
pub mod utils;

0 commit comments

Comments
 (0)