Skip to content

Commit 50134e4

Browse files
add resolver
1 parent 15cc3d8 commit 50134e4

1 file changed

Lines changed: 24 additions & 49 deletions

File tree

follower/src/resolver.rs

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -108,21 +108,21 @@ pub struct Actor<E: Spawner, C: Source> {
108108
requests: HashMap<handler::Request<Digest>, State>,
109109
retry_schedule: BTreeSet<(SystemTime, handler::Request<Digest>)>,
110110
fetch_retry_timeout: Duration,
111-
next_request_id: u64,
111+
next_id: u64,
112112
}
113113

114114
enum State {
115-
// A fetch is currently running. The request id lets us ignore stale
115+
// A fetch is currently running. The id lets us ignore stale
116116
// completions from an earlier attempt for the same key, and dropping the
117117
// aborter cancels the current attempt.
118-
Active { request_id: u64, aborter: Aborter },
118+
Active { id: u64, aborter: Aborter },
119119
// A retry is queued for the recorded deadline.
120120
Scheduled(SystemTime),
121121
}
122122

123123
struct FetchResult {
124124
key: handler::Request<Digest>,
125-
request_id: u64,
125+
id: u64,
126126
retry: bool,
127127
}
128128

@@ -146,7 +146,7 @@ impl<E: Spawner + Clock + CryptoRng + RngCore, C: Source> Actor<E, C> {
146146
requests: HashMap::new(),
147147
retry_schedule: BTreeSet::new(),
148148
fetch_retry_timeout,
149-
next_request_id: 0,
149+
next_id: 0,
150150
};
151151

152152
let handle = Resolver { mailbox_tx };
@@ -239,53 +239,40 @@ impl<E: Spawner + Clock + CryptoRng + RngCore, C: Source> Actor<E, C> {
239239
}
240240

241241
fn start_fetch(&mut self, key: handler::Request<Digest>) {
242-
let request_id = self.next_request_id;
243-
self.next_request_id = self
244-
.next_request_id
245-
.checked_add(1)
246-
.expect("request id overflow");
247-
let future = Self::process_fetch(
248-
key.clone(),
249-
request_id,
250-
self.client.clone(),
251-
self.handler.clone(),
252-
);
242+
let id = self.next_id;
243+
self.next_id = self.next_id.wrapping_add(1);
244+
let future =
245+
Self::process_fetch(key.clone(), id, self.client.clone(), self.handler.clone());
253246
let aborter = self.in_flight.push(future);
254-
let previous = self.requests.insert(
255-
key,
256-
State::Active {
257-
request_id,
258-
aborter,
259-
},
260-
);
247+
let previous = self.requests.insert(key, State::Active { id, aborter });
261248
assert!(previous.is_none(), "request state already existed");
262249
}
263250

264251
fn handle_completed(&mut self, result: FetchResult) {
265252
let Some(state) = self.requests.get(&result.key) else {
266253
trace!(
267254
?result.key,
268-
request_id = result.request_id,
255+
id = result.id,
269256
"ignoring stale fetch completion for removed request"
270257
);
271258
return;
272259
};
273260

274261
match state {
275-
State::Active { request_id, .. } if *request_id == result.request_id => {}
276-
State::Active { request_id, .. } => {
262+
State::Active { id, .. } if *id == result.id => {}
263+
State::Active { id, .. } => {
277264
trace!(
278265
?result.key,
279-
completed_request_id = result.request_id,
280-
active_request_id = *request_id,
266+
completed_id = result.id,
267+
active_id = *id,
281268
"ignoring stale fetch completion for replaced request"
282269
);
283270
return;
284271
}
285272
State::Scheduled(deadline) => {
286273
trace!(
287274
?result.key,
288-
request_id = result.request_id,
275+
id = result.id,
289276
?deadline,
290277
"ignoring stale fetch completion for scheduled request"
291278
);
@@ -297,7 +284,7 @@ impl<E: Spawner + Clock + CryptoRng + RngCore, C: Source> Actor<E, C> {
297284
assert!(
298285
matches!(
299286
removed,
300-
Some(State::Active { request_id, .. }) if request_id == result.request_id
287+
Some(State::Active { id, .. }) if id == result.id
301288
),
302289
"active request state missing for completed fetch"
303290
);
@@ -358,7 +345,7 @@ impl<E: Spawner + Clock + CryptoRng + RngCore, C: Source> Actor<E, C> {
358345

359346
async fn process_fetch(
360347
key: handler::Request<Digest>,
361-
request_id: u64,
348+
id: u64,
362349
client: C,
363350
handler: handler::Handler<Digest>,
364351
) -> FetchResult {
@@ -373,11 +360,7 @@ impl<E: Spawner + Clock + CryptoRng + RngCore, C: Source> Actor<E, C> {
373360
Self::fetch_notarized_by_round(*round, client, handler).await
374361
}
375362
};
376-
FetchResult {
377-
key,
378-
request_id,
379-
retry,
380-
}
363+
FetchResult { key, id, retry }
381364
}
382365

383366
async fn fetch_block_by_digest(
@@ -912,33 +895,25 @@ mod tests {
912895
let key = handler::Request::<Digest>::Block(digest);
913896

914897
actor.start_fetch(key.clone());
915-
let Some(State::Active {
916-
request_id: first_request_id,
917-
..
918-
}) = actor.requests.remove(&key)
919-
else {
898+
let Some(State::Active { id: first_id, .. }) = actor.requests.remove(&key) else {
920899
panic!("expected first fetch attempt to be active");
921900
};
922901

923902
actor.start_fetch(key.clone());
924-
let Some(State::Active {
925-
request_id: second_request_id,
926-
..
927-
}) = actor.requests.get(&key)
928-
else {
903+
let Some(State::Active { id: second_id, .. }) = actor.requests.get(&key) else {
929904
panic!("expected second fetch attempt to be active");
930905
};
931-
let second_request_id = *second_request_id;
906+
let second_id = *second_id;
932907

933908
actor.handle_completed(FetchResult {
934909
key: key.clone(),
935-
request_id: first_request_id,
910+
id: first_id,
936911
retry: true,
937912
});
938913

939914
assert!(matches!(
940915
actor.requests.get(&key),
941-
Some(State::Active { request_id, .. }) if *request_id == second_request_id
916+
Some(State::Active { id, .. }) if *id == second_id
942917
));
943918
assert!(
944919
actor.retry_schedule.is_empty(),

0 commit comments

Comments
 (0)