Skip to content

Commit 066c8a6

Browse files
committed
Bound author enrichment concurrency
1 parent 9e83667 commit 066c8a6

1 file changed

Lines changed: 87 additions & 49 deletions

File tree

src/ui/app.rs

Lines changed: 87 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ use crossterm::event::{
2222
self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers, MouseButton, MouseEvent,
2323
MouseEventKind,
2424
};
25+
use parking_lot::Mutex;
2526
use ratatui::prelude::*;
2627
use ratatui::widgets::{Block, Borders, Paragraph};
2728
use std::collections::HashSet;
2829
use std::path::PathBuf;
2930
use std::sync::mpsc::{self, Receiver, TryRecvError};
31+
use std::sync::Arc;
3032
use std::time::{Duration, Instant};
3133

3234
#[derive(Debug, Clone)]
@@ -88,6 +90,8 @@ fn osc52_copy(text: &str) {
8890
let _ = stdout.flush();
8991
}
9092

93+
const MAX_AUTHOR_ENRICHMENT_WORKERS: usize = 4;
94+
9195
pub struct App {
9296
state: AppState,
9397
config: Config,
@@ -438,6 +442,18 @@ impl App {
438442
.collect()
439443
}
440444

445+
fn author_enrichment_worker_count(item_count: usize) -> usize {
446+
if item_count == 0 {
447+
return 0;
448+
}
449+
450+
let available = std::thread::available_parallelism()
451+
.map(|parallelism| parallelism.get())
452+
.unwrap_or(MAX_AUTHOR_ENRICHMENT_WORKERS);
453+
454+
item_count.min(available).min(MAX_AUTHOR_ENRICHMENT_WORKERS)
455+
}
456+
441457
/// Open a URL in the browser using custom command if configured, otherwise system default.
442458
fn open_url_in_browser(&self, url: &str) -> std::io::Result<()> {
443459
if let Some(ref browser_cmd) = self.config.browser_command {
@@ -646,78 +662,93 @@ impl App {
646662
}
647663

648664
let (tx, rx) = mpsc::channel::<EnrichmentResult>();
665+
let worker_count = Self::author_enrichment_worker_count(to_fetch.len());
649666

650667
std::thread::spawn(move || {
651668
// (id, "author"|"context", value)
652669
let (inner_tx, inner_rx) = mpsc::channel::<(String, &'static str, String)>();
653670

654-
for item in to_fetch {
671+
let work_queue = Arc::new(Mutex::new(std::collections::VecDeque::from(to_fetch)));
672+
let mut workers = Vec::with_capacity(worker_count);
673+
674+
for _ in 0..worker_count {
655675
let client = client.clone();
656676
let inner_tx = inner_tx.clone();
657-
std::thread::spawn(move || {
658-
// Fetch author from latest_comment_url
659-
if let Some(url) = &item.comment_url {
660-
if let Ok(Some(author)) = client.get_comment_author(url) {
661-
let _ = inner_tx.send((item.id.clone(), "author", author));
677+
let work_queue = Arc::clone(&work_queue);
678+
workers.push(std::thread::spawn(move || {
679+
loop {
680+
let Some(item) = work_queue.lock().pop_front() else {
681+
break;
682+
};
683+
684+
// Fetch author from latest_comment_url
685+
if let Some(url) = &item.comment_url {
686+
if let Ok(Some(author)) = client.get_comment_author(url) {
687+
let _ = inner_tx.send((item.id.clone(), "author", author));
688+
}
662689
}
663-
}
664690

665-
// Fetch subject state for state_change notifications
666-
if let Some(url) = &item.subject_url {
667-
if let Ok(value) = client.get_json_by_url(url) {
668-
let context = if item.is_pr {
669-
let merged = value
670-
.get("merged")
671-
.and_then(|v| v.as_bool())
672-
.unwrap_or(false);
673-
if merged {
674-
// Also extract merged_by as author fallback
675-
if item.comment_url.is_none() {
676-
if let Some(login) = value
677-
.get("merged_by")
678-
.and_then(|u| u.get("login"))
679-
.and_then(|l| l.as_str())
680-
{
681-
let _ = inner_tx.send((
682-
item.id.clone(),
683-
"author",
684-
login.to_string(),
685-
));
691+
// Fetch subject state for state_change notifications
692+
if let Some(url) = &item.subject_url {
693+
if let Ok(value) = client.get_json_by_url(url) {
694+
let context = if item.is_pr {
695+
let merged = value
696+
.get("merged")
697+
.and_then(|v| v.as_bool())
698+
.unwrap_or(false);
699+
if merged {
700+
// Also extract merged_by as author fallback
701+
if item.comment_url.is_none() {
702+
if let Some(login) = value
703+
.get("merged_by")
704+
.and_then(|u| u.get("login"))
705+
.and_then(|l| l.as_str())
706+
{
707+
let _ = inner_tx.send((
708+
item.id.clone(),
709+
"author",
710+
login.to_string(),
711+
));
712+
}
686713
}
714+
"merged".to_string()
715+
} else {
716+
value
717+
.get("state")
718+
.and_then(|v| v.as_str())
719+
.unwrap_or("open")
720+
.to_lowercase()
687721
}
688-
"merged".to_string()
689722
} else {
690-
value
723+
let state = value
691724
.get("state")
692725
.and_then(|v| v.as_str())
693726
.unwrap_or("open")
694-
.to_lowercase()
695-
}
696-
} else {
697-
let state = value
698-
.get("state")
699-
.and_then(|v| v.as_str())
700-
.unwrap_or("open")
701-
.to_lowercase();
702-
if state == "closed" {
703-
if let Some(reason) =
704-
value.get("state_reason").and_then(|v| v.as_str())
705-
{
706-
format!("closed:{reason}")
727+
.to_lowercase();
728+
if state == "closed" {
729+
if let Some(reason) =
730+
value.get("state_reason").and_then(|v| v.as_str())
731+
{
732+
format!("closed:{reason}")
733+
} else {
734+
state
735+
}
707736
} else {
708737
state
709738
}
710-
} else {
711-
state
712-
}
713-
};
714-
let _ = inner_tx.send((item.id.clone(), "context", context));
739+
};
740+
let _ = inner_tx.send((item.id.clone(), "context", context));
741+
}
715742
}
716743
}
717-
});
744+
}));
718745
}
719746
drop(inner_tx);
720747

748+
for worker in workers {
749+
let _ = worker.join();
750+
}
751+
721752
let mut authors = std::collections::HashMap::new();
722753
let mut contexts = std::collections::HashMap::new();
723754
for (id, kind, value) in inner_rx {
@@ -3834,6 +3865,13 @@ mod tests {
38343865
assert!(app.previous_notification_ids.is_empty());
38353866
}
38363867

3868+
#[test]
3869+
fn author_enrichment_worker_count_is_bounded() {
3870+
assert_eq!(App::author_enrichment_worker_count(0), 0);
3871+
assert_eq!(App::author_enrichment_worker_count(1), 1);
3872+
assert!(App::author_enrichment_worker_count(128) <= MAX_AUTHOR_ENRICHMENT_WORKERS);
3873+
}
3874+
38373875
#[test]
38383876
fn mark_all_confirm_message_mentions_filtered_notifications() {
38393877
let mut app = App::new(Config::default());

0 commit comments

Comments
 (0)