Skip to content

Commit f2b7ab7

Browse files
authored
[Storage] Make QMDB stream_range: Send (#3679)
1 parent cc18999 commit f2b7ab7

2 files changed

Lines changed: 24 additions & 8 deletions

File tree

storage/src/qmdb/any/ordered/mod.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,9 @@ where
156156
where
157157
V: 'a,
158158
{
159-
let start_iter = self.snapshot.get(&start);
160-
let mut init_pending = self.fetch_all_updates(start_iter).await?;
159+
// Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
160+
let start_locs: Vec<Location<F>> = self.snapshot.get(&start).copied().collect();
161+
let mut init_pending = self.fetch_all_updates(start_locs.iter()).await?;
161162
init_pending.retain(|x| x.key >= start);
162163

163164
Ok(stream::unfold(
@@ -168,16 +169,21 @@ where
168169
return Some((Ok((item.key, item.value)), (driver_key, pending)));
169170
}
170171

171-
let Some((iter, wrapped)) = self.snapshot.next_translated_key(&driver_key) else {
172-
return None; // DB is empty
172+
// Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
173+
let locs: Vec<Location<F>> = {
174+
let Some((iter, wrapped)) = self.snapshot.next_translated_key(&driver_key)
175+
else {
176+
return None; // DB is empty
177+
};
178+
if wrapped {
179+
return None; // End of DB
180+
}
181+
iter.copied().collect()
173182
};
174-
if wrapped {
175-
return None; // End of DB
176-
}
177183

178184
// TODO(https://github.com/commonwarexyz/monorepo/issues/2527): concurrently
179185
// fetch a much larger batch of "pending" keys.
180-
match self.fetch_all_updates(iter).await {
186+
match self.fetch_all_updates(locs.iter()).await {
181187
Ok(mut pending) => {
182188
let item = pending.pop().expect("pending is not empty");
183189
let key = item.key.clone();

storage/src/qmdb/current/ordered/variable.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,16 @@ mod test {
119119
type CurrentTest =
120120
super::Db<mmr::Family, deterministic::Context, Digest, Digest, Sha256, OneCap, 32>;
121121

122+
#[allow(dead_code)]
123+
fn _assert_stream_range_is_send(db: &CurrentTest, start: Digest) {
124+
fn require_send<F: core::future::Future + Send>(_: F) {}
125+
require_send(async move {
126+
let stream = db.stream_range(start).await.unwrap();
127+
futures::pin_mut!(stream);
128+
let _ = futures::StreamExt::next(&mut stream).await;
129+
});
130+
}
131+
122132
/// Return a [Db] database initialized with a variable config.
123133
async fn open_db(context: deterministic::Context, partition_prefix: String) -> CurrentTest {
124134
let cfg = variable_config::<OneCap>(&partition_prefix, &context);

0 commit comments

Comments
 (0)