Skip to content

Commit ada952e

Browse files
feat(taskbroker): Add Application to Worker Pool Mapping (#604)
* Add Application to Worker Mapping * Add Counter for Missing Worker Mapping * Add Missing Worker Mapping Metric to Push Channel Drain * Add Application (Worker Pool) Tags to Metrics, Change `getsentry` to `sentry` * Add Link Back to Comment on Signature Helper * Fix Tests Hopefully * Add Application Tag to Drain Loop Missing Mapping Metric * Try Fixing Stuck Unit Test Due to Shared DLQ Topic * Fix Lint
1 parent 8aad03c commit ada952e

7 files changed

Lines changed: 117 additions & 111 deletions

File tree

src/config.rs

Lines changed: 49 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -290,20 +290,14 @@ pub struct Config {
290290
/// Maximum time in milliseconds for a single push RPC to the worker service. This should be greater than the worker's internal timeout.
291291
pub push_timeout_ms: u64,
292292

293-
/// The worker service endpoint.
294-
pub worker_endpoint: String,
295-
296293
/// The hostname used to construct `callback_url` for task push requests.
297294
pub callback_addr: String,
298295

299296
/// The port used to construct `callback_url` for task push requests.
300297
pub callback_port: u32,
301298

302-
/// Application filter for push mode. When set, only pending activations for this application are considered.
303-
pub application: Option<String>,
304-
305-
/// List of namespaces for push mode. When set, application must also be set (store requirement).
306-
pub namespaces: Option<Vec<String>>,
299+
/// Maps every application to its worker endpoint, both represented as strings.
300+
pub worker_map: BTreeMap<String, String>,
307301
}
308302

309303
impl Default for Config {
@@ -385,11 +379,9 @@ impl Default for Config {
385379
push_queue_size: 1,
386380
push_queue_timeout_ms: 5000,
387381
push_timeout_ms: 30000,
388-
worker_endpoint: "http://127.0.0.1:50052".into(),
389382
callback_addr: "0.0.0.0".into(),
390383
callback_port: 50051,
391-
application: None,
392-
namespaces: None,
384+
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
393385
}
394386
}
395387
}
@@ -527,6 +519,10 @@ mod tests {
527519
assert_eq!(config.max_pending_count, 2048);
528520
assert_eq!(config.max_processing_count, 2048);
529521
assert_eq!(config.vacuum_page_count, None);
522+
assert_eq!(
523+
config.worker_map.get("sentry").map(String::as_str),
524+
Some("http://127.0.0.1:50052")
525+
);
530526
}
531527

532528
#[test]
@@ -554,6 +550,9 @@ mod tests {
554550
max_processing_attempts: 5
555551
vacuum_page_count: 1000
556552
full_vacuum_on_start: true
553+
worker_map:
554+
sentry: http://worker-sentry:50052
555+
launchpad: http://worker-launchpad:50053
557556
"#,
558557
)?;
559558
// Env vars always override config file
@@ -588,6 +587,16 @@ mod tests {
588587
assert_eq!(config.vacuum_page_count, Some(1000));
589588
assert_eq!(config.db_max_size, Some(3_000_000_000));
590589
assert!(config.full_vacuum_on_start);
590+
assert_eq!(
591+
config.worker_map,
592+
BTreeMap::from([
593+
("sentry".to_owned(), "http://worker-sentry:50052".to_owned(),),
594+
(
595+
"launchpad".to_owned(),
596+
"http://worker-launchpad:50053".to_owned(),
597+
),
598+
])
599+
);
591600

592601
Ok(())
593602
});
@@ -632,6 +641,35 @@ mod tests {
632641
config.default_metrics_tags,
633642
BTreeMap::from([("key".to_owned(), "value".to_owned())])
634643
);
644+
assert_eq!(
645+
config.worker_map.get("sentry").map(String::as_str),
646+
Some("http://127.0.0.1:50052"),
647+
"partial env override must not drop worker_map defaults"
648+
);
649+
650+
Ok(())
651+
});
652+
}
653+
654+
/// `worker_map` uses the same env map encoding as `default_metrics_tags` (brace `key=value` pairs).
655+
#[test]
656+
fn test_worker_map_from_env() {
657+
Jail::expect_with(|jail| {
658+
jail.set_env("TASKBROKER_LOG_FILTER", "error");
659+
jail.set_env(
660+
"TASKBROKER_WORKER_MAP",
661+
"{sentry=http://127.0.0.1:60052,launchpad=http://127.0.0.1:60053}",
662+
);
663+
664+
let args = Args { config: None };
665+
let config = Config::from_args(&args).unwrap();
666+
assert_eq!(
667+
config.worker_map,
668+
BTreeMap::from([
669+
("sentry".to_owned(), "http://127.0.0.1:60052".to_owned(),),
670+
("launchpad".to_owned(), "http://127.0.0.1:60053".to_owned(),),
671+
])
672+
);
635673

636674
Ok(())
637675
});
@@ -876,49 +914,4 @@ mod tests {
876914
Ok(())
877915
});
878916
}
879-
880-
#[test]
881-
fn test_default_application_and_namespaces() {
882-
let config = Config::default();
883-
assert_eq!(config.application, None);
884-
assert_eq!(config.namespaces, None);
885-
}
886-
887-
#[test]
888-
fn test_from_args_application_from_env() {
889-
Jail::expect_with(|jail| {
890-
jail.set_env("TASKBROKER_APPLICATION", "getsentry");
891-
892-
let args = Args { config: None };
893-
let config = Config::from_args(&args).unwrap();
894-
assert_eq!(config.application.as_deref(), Some("getsentry"));
895-
assert_eq!(config.namespaces, None);
896-
897-
Ok(())
898-
});
899-
}
900-
901-
#[test]
902-
fn test_from_args_application_and_namespaces_from_config_file() {
903-
Jail::expect_with(|jail| {
904-
jail.create_file(
905-
"config.yaml",
906-
r#"
907-
application: getsentry
908-
namespaces:
909-
- ns1
910-
- ns2
911-
"#,
912-
)?;
913-
914-
let args = Args {
915-
config: Some("config.yaml".to_owned()),
916-
};
917-
let config = Config::from_args(&args).unwrap();
918-
assert_eq!(config.application.as_deref(), Some("getsentry"));
919-
assert_eq!(config.namespaces, Some(vec!["ns1".into(), "ns2".into()]));
920-
921-
Ok(())
922-
});
923-
}
924917
}

src/fetch/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,7 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
115115
let start = Instant::now();
116116
let mut backoff = false;
117117

118-
let application = config.application.as_deref();
119-
let namespaces = config.namespaces.as_deref();
120-
121-
match store
122-
.claim_activations_for_push(application, namespaces, limit, bucket)
123-
.await
124-
{
118+
match store.claim_activations_for_push(limit, bucket).await {
125119
Ok(activations) if activations.is_empty() => {
126120
metrics::counter!("fetch.empty").increment(1);
127121
debug!("No pending activations");

src/push/mod.rs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::sync::Arc;
23
use std::time::{Duration, Instant};
34

@@ -113,7 +114,7 @@ impl PushPool {
113114
let mut push_pool: JoinSet<Result<()>> = crate::tokio::spawn_pool(
114115
self.config.push_threads,
115116
|_| {
116-
let endpoint = self.config.worker_endpoint.clone();
117+
let worker_map = self.config.worker_map.clone();
117118
let receiver = self.receiver.clone();
118119
let store = store.clone();
119120

@@ -130,21 +131,28 @@ impl PushPool {
130131
async move {
131132
metrics::counter!("push.worker.connect.attempt").increment(1);
132133

133-
let mut worker = match WorkerServiceClient::connect(endpoint).await {
134-
Ok(w) => {
135-
metrics::counter!("push.worker.connect", "result" => "ok").increment(1);
136-
w
137-
}
134+
let mut workers = HashMap::new();
138135

139-
Err(e) => {
140-
metrics::counter!("push.worker.connect", "result" => "error")
141-
.increment(1);
142-
error!("Failed to connect to worker - {:?}", e);
136+
for (application, endpoint) in worker_map.clone() {
137+
let worker = match WorkerServiceClient::connect(endpoint).await {
138+
Ok(w) => {
139+
metrics::counter!("push.worker.connect", "result" => "ok", "application" => application.clone())
140+
.increment(1);
141+
w
142+
}
143143

144-
// When we fail to connect, the taskbroker will shut down, but this may change in the future
145-
return Err(e.into());
146-
}
147-
};
144+
Err(e) => {
145+
metrics::counter!("push.worker.connect", "result" => "error", "application" => application.clone())
146+
.increment(1);
147+
error!(error = ?e, "Failed to connect to worker");
148+
149+
// When we fail to connect, the taskbroker will shut down, but this may change in the future
150+
return Err(e.into());
151+
}
152+
};
153+
154+
workers.insert(application, worker);
155+
}
148156

149157
loop {
150158
tokio::select! {
@@ -165,8 +173,20 @@ impl PushPool {
165173
let id = activation.id.clone();
166174
let callback_url = callback_url.clone();
167175

176+
let Some(worker) = workers.get_mut(&activation.application) else {
177+
metrics::counter!("push.missing_worker_mapping", "application" => activation.application.clone()).increment(1);
178+
179+
error!(
180+
task_id = %id,
181+
application = activation.application,
182+
"Task application has no worker pool mapping"
183+
);
184+
185+
continue
186+
};
187+
168188
match push_task(
169-
&mut worker,
189+
worker,
170190
activation,
171191
callback_url,
172192
timeout,
@@ -209,8 +229,20 @@ impl PushPool {
209229
let id = activation.id.clone();
210230
let callback_url = callback_url.clone();
211231

232+
let Some(worker) = workers.get_mut(&activation.application) else {
233+
metrics::counter!("push.missing_worker_mapping", "application" => activation.application.clone()).increment(1);
234+
235+
error!(
236+
task_id = %id,
237+
application = activation.application,
238+
"Task application has no worker pool mapping"
239+
);
240+
241+
continue;
242+
};
243+
212244
match push_task(
213-
&mut worker,
245+
worker,
214246
activation,
215247
callback_url,
216248
timeout,

src/store/tests.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) {
265265
assert!(store.store(batch).await.is_ok());
266266

267267
let mut first = store
268-
.claim_activations_for_push(None, None, Some(1), Some((15, 25)))
268+
.claim_activations_for_push(Some(1), Some((15, 25)))
269269
.await
270270
.unwrap();
271271
assert_eq!(first.len(), 1);
@@ -274,7 +274,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) {
274274
assert_eq!(first.bucket, 20);
275275

276276
let mut second = store
277-
.claim_activations_for_push(None, None, Some(1), Some((0, 15)))
277+
.claim_activations_for_push(Some(1), Some((0, 15)))
278278
.await
279279
.unwrap();
280280
assert_eq!(second.len(), 1);
@@ -284,7 +284,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) {
284284

285285
assert!(
286286
store
287-
.claim_activations_for_push(None, None, Some(1), Some((15, 25)))
287+
.claim_activations_for_push(Some(1), Some((15, 25)))
288288
.await
289289
.unwrap()
290290
.is_empty()
@@ -606,10 +606,7 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) {
606606
let batch = make_activations(N as u32);
607607
assert!(store.store(batch).await.is_ok());
608608

609-
let got = store
610-
.claim_activations_for_push(None, None, None, None)
611-
.await
612-
.unwrap();
609+
let got = store.claim_activations_for_push(None, None).await.unwrap();
613610
assert_eq!(got.len(), N);
614611
assert!(
615612
got.iter()
@@ -641,7 +638,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str)
641638
assert!(store.store(batch).await.is_ok());
642639

643640
let got = store
644-
.claim_activations_for_push(None, None, Some(X), None)
641+
.claim_activations_for_push(Some(X), None)
645642
.await
646643
.unwrap();
647644
assert_eq!(got.len(), X as usize);
@@ -678,7 +675,7 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str)
678675
assert!(store.store(batch).await.is_ok());
679676

680677
let got = store
681-
.claim_activations_for_push(None, None, Some(X), None)
678+
.claim_activations_for_push(Some(X), None)
682679
.await
683680
.unwrap();
684681
assert_eq!(got.len(), Y);

src/store/traits.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,10 @@ pub trait InflightActivationStore: Send + Sync {
3030
/// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_activation_processing` moves to `Processing`.
3131
async fn claim_activations_for_push(
3232
&self,
33-
application: Option<&str>,
34-
namespaces: Option<&[String]>,
3533
limit: Option<i32>,
3634
bucket: Option<BucketRange>,
3735
) -> Result<Vec<InflightActivation>, Error> {
38-
// If a namespace filter is used, an application must also be used
39-
if namespaces.is_some() && application.is_none() {
40-
warn!(
41-
?namespaces,
42-
"Received request for namespaced task without application"
43-
);
44-
45-
return Ok(vec![]);
46-
}
47-
48-
self.claim_activations(application, namespaces, limit, bucket, false)
36+
self.claim_activations(None, None, limit, bucket, false)
4937
.await
5038
}
5139

src/test_utils.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ pub fn create_integration_config_with_ssl() -> Arc<Config> {
336336
Arc::new(config)
337337
}
338338

339-
pub fn create_integration_config_with_topic(topic: String) -> Arc<Config> {
340-
let config = Config {
339+
pub fn create_integration_config_with_topic(topic: String) -> Config {
340+
Config {
341341
pg_host: get_pg_host(),
342342
pg_port: get_pg_port(),
343343
pg_username: get_pg_username(),
@@ -347,9 +347,7 @@ pub fn create_integration_config_with_topic(topic: String) -> Arc<Config> {
347347
kafka_topic: topic,
348348
kafka_auto_offset_reset: "earliest".into(),
349349
..Config::default()
350-
};
351-
352-
Arc::new(config)
350+
}
353351
}
354352

355353
/// Create a kafka producer for a given config

0 commit comments

Comments
 (0)