Skip to content

Commit 67dd29a

Browse files
authored
refactor(perf): Pool ActiveQuerys in the query stack (#629)
* Do not pass ownership of the `QueryStack` in `DependencyGraph::block_on` * Pool `ActiveQuery`s in the query stack * Don't do unnecessary work when discaring the query guard * Gate debug only field by `debug_assertions` * Reduce visibility
1 parent d758691 commit 67dd29a

File tree

6 files changed

+320
-123
lines changed

6 files changed

+320
-123
lines changed

src/accumulator/accumulated_map.rs

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ impl AccumulatedMap {
4848
pub fn is_empty(&self) -> bool {
4949
self.map.is_empty()
5050
}
51+
52+
pub fn clear(&mut self) {
53+
self.map.clear()
54+
}
5155
}
5256

5357
/// Tracks whether any input read during a query's execution has any accumulated values.

src/active_query.rs

+190-33
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::ops::Not;
2+
use std::{mem, ops};
23

34
use super::zalsa_local::{QueryEdges, QueryOrigin, QueryRevisions};
45
use crate::accumulator::accumulated_map::AtomicInputAccumulatedValues;
6+
use crate::runtime::Stamp;
57
use crate::tracked_struct::{DisambiguatorMap, IdentityHash, IdentityMap};
68
use crate::zalsa_local::QueryEdge;
79
use crate::{
@@ -13,18 +15,19 @@ use crate::{
1315
tracked_struct::Disambiguator,
1416
Revision,
1517
};
18+
use crate::{Accumulator, IngredientIndex};
1619

1720
#[derive(Debug)]
1821
pub(crate) struct ActiveQuery {
1922
/// What query is executing
2023
pub(crate) database_key_index: DatabaseKeyIndex,
2124

2225
/// Minimum durability of inputs observed so far.
23-
pub(crate) durability: Durability,
26+
durability: Durability,
2427

2528
/// Maximum revision of all inputs observed. If we observe an
2629
/// untracked read, this will be set to the most recent revision.
27-
pub(crate) changed_at: Revision,
30+
changed_at: Revision,
2831

2932
/// Inputs: Set of subqueries that were accessed thus far.
3033
/// Outputs: Tracks values written by this query. Could be...
@@ -52,32 +55,17 @@ pub(crate) struct ActiveQuery {
5255

5356
/// Stores the values accumulated to the given ingredient.
5457
/// The type of accumulated value is erased but known to the ingredient.
55-
pub(crate) accumulated: AccumulatedMap,
58+
accumulated: AccumulatedMap,
5659

5760
/// [`InputAccumulatedValues::Empty`] if any input read during the query's execution
5861
/// has any accumulated values.
59-
pub(super) accumulated_inputs: InputAccumulatedValues,
62+
accumulated_inputs: InputAccumulatedValues,
6063

6164
/// Provisional cycle results that this query depends on.
62-
pub(crate) cycle_heads: CycleHeads,
65+
cycle_heads: CycleHeads,
6366
}
6467

6568
impl ActiveQuery {
66-
pub(super) fn new(database_key_index: DatabaseKeyIndex) -> Self {
67-
ActiveQuery {
68-
database_key_index,
69-
durability: Durability::MAX,
70-
changed_at: Revision::start(),
71-
input_outputs: FxIndexSet::default(),
72-
untracked_read: false,
73-
disambiguator_map: Default::default(),
74-
tracked_struct_ids: Default::default(),
75-
accumulated: Default::default(),
76-
accumulated_inputs: Default::default(),
77-
cycle_heads: Default::default(),
78-
}
79-
}
80-
8169
pub(super) fn add_read(
8270
&mut self,
8371
input: DatabaseKeyIndex,
@@ -116,6 +104,10 @@ impl ActiveQuery {
116104
self.changed_at = self.changed_at.max(revision);
117105
}
118106

107+
pub(super) fn accumulate(&mut self, index: IngredientIndex, value: impl Accumulator) {
108+
self.accumulated.accumulate(index, value);
109+
}
110+
119111
/// Adds a key to our list of outputs.
120112
pub(super) fn add_output(&mut self, key: DatabaseKeyIndex) {
121113
self.input_outputs.insert(QueryEdge::Output(key));
@@ -126,30 +118,195 @@ impl ActiveQuery {
126118
self.input_outputs.contains(&QueryEdge::Output(key))
127119
}
128120

129-
pub(crate) fn into_revisions(self) -> QueryRevisions {
130-
let edges = QueryEdges::new(self.input_outputs);
131-
let origin = if self.untracked_read {
121+
pub(super) fn disambiguate(&mut self, key: IdentityHash) -> Disambiguator {
122+
self.disambiguator_map.disambiguate(key)
123+
}
124+
125+
pub(super) fn stamp(&self) -> Stamp {
126+
Stamp {
127+
value: (),
128+
durability: self.durability,
129+
changed_at: self.changed_at,
130+
}
131+
}
132+
}
133+
134+
impl ActiveQuery {
135+
fn new(database_key_index: DatabaseKeyIndex) -> Self {
136+
ActiveQuery {
137+
database_key_index,
138+
durability: Durability::MAX,
139+
changed_at: Revision::start(),
140+
input_outputs: FxIndexSet::default(),
141+
untracked_read: false,
142+
disambiguator_map: Default::default(),
143+
tracked_struct_ids: Default::default(),
144+
accumulated: Default::default(),
145+
accumulated_inputs: Default::default(),
146+
cycle_heads: Default::default(),
147+
}
148+
}
149+
150+
fn top_into_revisions(&mut self) -> QueryRevisions {
151+
let &mut Self {
152+
database_key_index: _,
153+
durability,
154+
changed_at,
155+
ref mut input_outputs,
156+
untracked_read,
157+
ref mut disambiguator_map,
158+
ref mut tracked_struct_ids,
159+
ref mut accumulated,
160+
accumulated_inputs,
161+
ref mut cycle_heads,
162+
} = self;
163+
164+
let edges = QueryEdges::new(input_outputs.drain(..));
165+
let origin = if untracked_read {
132166
QueryOrigin::DerivedUntracked(edges)
133167
} else {
134168
QueryOrigin::Derived(edges)
135169
};
136-
let accumulated = self
137-
.accumulated
170+
disambiguator_map.clear();
171+
let accumulated = accumulated
138172
.is_empty()
139173
.not()
140-
.then(|| Box::new(self.accumulated));
174+
.then(|| Box::new(mem::take(accumulated)));
175+
let tracked_struct_ids = mem::take(tracked_struct_ids);
176+
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);
177+
let cycle_heads = mem::take(cycle_heads);
141178
QueryRevisions {
142-
changed_at: self.changed_at,
179+
changed_at,
180+
durability,
143181
origin,
144-
durability: self.durability,
145-
tracked_struct_ids: self.tracked_struct_ids,
146-
accumulated_inputs: AtomicInputAccumulatedValues::new(self.accumulated_inputs),
182+
tracked_struct_ids,
183+
accumulated_inputs,
147184
accumulated,
148-
cycle_heads: self.cycle_heads,
185+
cycle_heads,
149186
}
150187
}
151188

152-
pub(super) fn disambiguate(&mut self, key: IdentityHash) -> Disambiguator {
153-
self.disambiguator_map.disambiguate(key)
189+
fn clear(&mut self) {
190+
let Self {
191+
database_key_index: _,
192+
durability: _,
193+
changed_at: _,
194+
input_outputs,
195+
untracked_read: _,
196+
disambiguator_map,
197+
tracked_struct_ids,
198+
accumulated,
199+
accumulated_inputs: _,
200+
cycle_heads,
201+
} = self;
202+
input_outputs.clear();
203+
disambiguator_map.clear();
204+
tracked_struct_ids.clear();
205+
accumulated.clear();
206+
*cycle_heads = Default::default();
207+
}
208+
209+
fn reset_for(&mut self, new_database_key_index: DatabaseKeyIndex) {
210+
let Self {
211+
database_key_index,
212+
durability,
213+
changed_at,
214+
input_outputs,
215+
untracked_read,
216+
disambiguator_map,
217+
tracked_struct_ids,
218+
accumulated,
219+
accumulated_inputs,
220+
cycle_heads,
221+
} = self;
222+
*database_key_index = new_database_key_index;
223+
*durability = Durability::MAX;
224+
*changed_at = Revision::start();
225+
*untracked_read = false;
226+
*accumulated_inputs = Default::default();
227+
debug_assert!(
228+
input_outputs.is_empty(),
229+
"`ActiveQuery::clear` or `ActiveQuery::into_revisions` should've been called"
230+
);
231+
debug_assert!(
232+
disambiguator_map.is_empty(),
233+
"`ActiveQuery::clear` or `ActiveQuery::into_revisions` should've been called"
234+
);
235+
debug_assert!(
236+
tracked_struct_ids.is_empty(),
237+
"`ActiveQuery::clear` or `ActiveQuery::into_revisions` should've been called"
238+
);
239+
debug_assert!(
240+
cycle_heads.is_empty(),
241+
"`ActiveQuery::clear` or `ActiveQuery::into_revisions` should've been called"
242+
);
243+
debug_assert!(
244+
accumulated.is_empty(),
245+
"`ActiveQuery::clear` or `ActiveQuery::into_revisions` should've been called"
246+
);
247+
}
248+
}
249+
250+
#[derive(Debug, Default)]
251+
pub(crate) struct QueryStack {
252+
stack: Vec<ActiveQuery>,
253+
len: usize,
254+
}
255+
256+
impl ops::Deref for QueryStack {
257+
type Target = [ActiveQuery];
258+
259+
fn deref(&self) -> &Self::Target {
260+
&self.stack[..self.len]
261+
}
262+
}
263+
264+
impl ops::DerefMut for QueryStack {
265+
fn deref_mut(&mut self) -> &mut Self::Target {
266+
&mut self.stack[..self.len]
267+
}
268+
}
269+
270+
impl QueryStack {
271+
pub(crate) fn push_new_query(&mut self, database_key_index: DatabaseKeyIndex) {
272+
if self.len < self.stack.len() {
273+
self.stack[self.len].reset_for(database_key_index);
274+
} else {
275+
self.stack.push(ActiveQuery::new(database_key_index));
276+
}
277+
self.len += 1;
278+
}
279+
280+
#[cfg(debug_assertions)]
281+
pub(crate) fn len(&self) -> usize {
282+
self.len
283+
}
284+
285+
pub(crate) fn pop_into_revisions(
286+
&mut self,
287+
key: DatabaseKeyIndex,
288+
#[cfg(debug_assertions)] push_len: usize,
289+
) -> QueryRevisions {
290+
#[cfg(debug_assertions)]
291+
assert_eq!(push_len, self.len(), "unbalanced push/pop");
292+
debug_assert_ne!(self.len, 0, "too many pops");
293+
self.len -= 1;
294+
debug_assert_eq!(
295+
self.stack[self.len].database_key_index, key,
296+
"unbalanced push/pop"
297+
);
298+
self.stack[self.len].top_into_revisions()
299+
}
300+
301+
pub(crate) fn pop(&mut self, key: DatabaseKeyIndex, #[cfg(debug_assertions)] push_len: usize) {
302+
#[cfg(debug_assertions)]
303+
assert_eq!(push_len, self.len(), "unbalanced push/pop");
304+
debug_assert_ne!(self.len, 0, "too many pops");
305+
self.len -= 1;
306+
debug_assert_eq!(
307+
self.stack[self.len].database_key_index, key,
308+
"unbalanced push/pop"
309+
);
310+
self.stack[self.len].clear()
154311
}
155312
}

src/runtime.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
mem,
32
panic::AssertUnwindSafe,
43
sync::atomic::{AtomicBool, Ordering},
54
thread::ThreadId,
@@ -196,16 +195,14 @@ impl Runtime {
196195
// we cannot enter an inconsistent state for this parameter.
197196
let query_mutex_guard = AssertUnwindSafe(query_mutex_guard);
198197
let result = local_state.with_query_stack(|stack| {
199-
let (new_stack, result) = DependencyGraph::block_on(
198+
DependencyGraph::block_on(
200199
{ dg }.0,
201200
thread_id,
202201
database_key,
203202
other_id,
204-
mem::take(stack),
203+
stack,
205204
{ query_mutex_guard }.0,
206-
);
207-
*stack = new_stack;
208-
result
205+
)
209206
});
210207

211208
match result {

0 commit comments

Comments
 (0)