Skip to content

Commit 8965244

Browse files
authored
Refactor index_in_parallel (#220)
This is an attempt to separate the thread worker logic from the actual indexing to make this function easier to reason about
1 parent 42654ff commit 8965244

File tree

1 file changed

+70
-48
lines changed

1 file changed

+70
-48
lines changed

rust/saturn/src/indexing.rs

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,69 @@ impl Document {
6969
/// This function will panic in the event of a thread dead lock, which indicates a bug in our implementation. There
7070
/// should not be any code that tries to lock the same mutex multiple times in the same thread
7171
pub fn index_in_parallel(graph: &mut Graph, documents: Vec<Document>) -> Result<(), MultipleErrors> {
72+
let results = with_parallel_workers(documents, index_document);
73+
74+
let mut all_errors = Vec::new();
75+
for (local_graph, errors) in results {
76+
graph.update(local_graph);
77+
all_errors.extend(errors);
78+
}
79+
80+
if all_errors.is_empty() {
81+
Ok(())
82+
} else {
83+
Err(MultipleErrors(all_errors))
84+
}
85+
}
86+
87+
/// Indexes a single document and returns the indexer parts
88+
fn index_document(document: &Document) -> IndexerParts {
89+
let (source, errors) = read_document_source(document);
90+
91+
let converter = UTF8SourceLocationConverter::new(&source);
92+
let mut ruby_indexer = RubyIndexer::new(document.uri.to_string(), &converter, &source);
93+
94+
if errors.is_empty() {
95+
ruby_indexer.index();
96+
} else {
97+
for error in errors {
98+
ruby_indexer.add_error(error);
99+
}
100+
}
101+
102+
ruby_indexer.into_parts()
103+
}
104+
105+
/// Reads the source content from a document, either from memory or disk
106+
fn read_document_source(document: &Document) -> (String, Vec<IndexingError>) {
107+
let mut errors = Vec::new();
108+
109+
let source = if let Some(source) = &document.source {
110+
source.clone()
111+
} else {
112+
match document.path() {
113+
Ok(path) => fs::read_to_string(&path).unwrap_or_else(|e| {
114+
errors.push(IndexingError::FileReadError(format!(
115+
"Failed to read {}: {}",
116+
path.display(),
117+
e
118+
)));
119+
String::new()
120+
}),
121+
Err(e) => {
122+
errors.push(e);
123+
String::new()
124+
}
125+
}
126+
};
127+
128+
(source, errors)
129+
}
130+
131+
fn with_parallel_workers<F>(documents: Vec<Document>, worker_fn: F) -> Vec<IndexerParts>
132+
where
133+
F: Fn(&Document) -> IndexerParts + Send + Clone + 'static,
134+
{
72135
let (tx, rx): (Sender<IndexerParts>, Receiver<IndexerParts>) = mpsc::channel();
73136
let num_threads = thread::available_parallelism().map(std::num::NonZero::get).unwrap_or(4);
74137
let mut threads = Vec::with_capacity(num_threads);
@@ -77,47 +140,13 @@ pub fn index_in_parallel(graph: &mut Graph, documents: Vec<Document>) -> Result<
77140
for _ in 0..num_threads {
78141
let thread_tx = tx.clone();
79142
let queue = Arc::clone(&document_queue);
143+
let thread_fn = worker_fn.clone();
80144

81145
let handle = thread::spawn(move || {
82146
while let Some(document) = { queue.lock().unwrap().pop() } {
83-
let (source, errors) = {
84-
let mut errors = Vec::new();
85-
86-
let source: String = if let Some(source) = &document.source {
87-
source.clone()
88-
} else {
89-
match document.path() {
90-
Ok(path) => fs::read_to_string(&path).unwrap_or_else(|e| {
91-
errors.push(IndexingError::FileReadError(format!(
92-
"Failed to read {}: {}",
93-
path.display(),
94-
e
95-
)));
96-
String::new()
97-
}),
98-
Err(e) => {
99-
errors.push(e);
100-
String::new()
101-
}
102-
}
103-
};
104-
105-
(source, errors)
106-
};
107-
108-
let converter = UTF8SourceLocationConverter::new(&source);
109-
let mut ruby_indexer = RubyIndexer::new(document.uri.to_string(), &converter, &source);
110-
111-
if errors.is_empty() {
112-
ruby_indexer.index();
113-
} else {
114-
for error in errors {
115-
ruby_indexer.add_error(error);
116-
}
117-
}
118-
147+
let result = thread_fn(&document);
119148
thread_tx
120-
.send(ruby_indexer.into_parts())
149+
.send(result)
121150
.expect("Receiver end should not be closed until all threads are done");
122151
}
123152
});
@@ -127,19 +156,12 @@ pub fn index_in_parallel(graph: &mut Graph, documents: Vec<Document>) -> Result<
127156

128157
drop(tx);
129158

130-
// Insert all discovered definitions into the global graph
131-
let mut all_errors = Vec::new();
132-
133-
for (local_graph, errors) in rx {
134-
graph.update(local_graph);
135-
all_errors.extend(errors);
159+
let mut results = Vec::new();
160+
for result in rx {
161+
results.push(result);
136162
}
137163

138-
if all_errors.is_empty() {
139-
Ok(())
140-
} else {
141-
Err(MultipleErrors(all_errors))
142-
}
164+
results
143165
}
144166

145167
/// Recursively collects all Ruby files for the given workspace and dependencies, returning a vector of document instances

0 commit comments

Comments
 (0)