Skip to content

Commit 051c116

Browse files
committed
Fix attempt
1 parent 781c44e commit 051c116

File tree

5 files changed

+74
-54
lines changed

5 files changed

+74
-54
lines changed

.github/workflows/deploy-profiling.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ on:
99
push:
1010
branches:
1111
- jemalloc-profiling-support
12+
- ilya/mem-leak-experiment
1213

1314
jobs:
1415
deploy-profiling:

crates/driver/src/infra/tokens.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ impl Fetcher {
3939
cache: RwLock::new(HashMap::new()),
4040
requests: BoxRequestSharing::labelled("token_info".into()),
4141
});
42-
tokio::task::spawn(
43-
update_task(block_stream, Arc::downgrade(&inner))
44-
.instrument(tracing::info_span!("token_fetcher")),
45-
);
42+
tokio::task::spawn(update_task(block_stream, Arc::downgrade(&inner)));
4643
Self(inner)
4744
}
4845

@@ -61,14 +58,23 @@ impl Fetcher {
6158
/// fetcher is dropped.
6259
async fn update_task(blocks: CurrentBlockWatcher, inner: std::sync::Weak<Inner>) {
6360
let mut stream = block_stream::into_stream(blocks);
64-
while stream.next().await.is_some() {
65-
let inner = match inner.upgrade() {
66-
Some(inner) => inner,
67-
// Fetcher was dropped, stop update task.
68-
None => break,
69-
};
70-
if let Err(err) = update_balances(inner).await {
71-
tracing::warn!(?err, "error updating token cache");
61+
while let Some(block) = stream.next().await {
62+
let span = tracing::info_span!("token_fetcher", block = block.number);
63+
let should_break = async {
64+
let inner = match inner.upgrade() {
65+
Some(inner) => inner,
66+
// Fetcher was dropped, stop update task.
67+
None => return true,
68+
};
69+
if let Err(err) = update_balances(inner).await {
70+
tracing::warn!(?err, "error updating token cache");
71+
}
72+
false
73+
}
74+
.instrument(span)
75+
.await;
76+
if should_break {
77+
break;
7278
}
7379
}
7480
}

crates/ethrpc/src/block_stream/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ pub async fn current_block_ws_stream(
172172
let first_block = BlockInfo::try_from(first_block).context("failed to parse initial block")?;
173173

174174
let (sender, receiver) = watch::channel(first_block);
175-
let update_future = async move {
175+
tokio::task::spawn(async move {
176176
// Keep WebSocket provider alive to maintain connection
177177
let _ws_provider = ws_provider;
178178
let mut previous_block = first_block;
@@ -188,6 +188,9 @@ pub async fn current_block_ws_stream(
188188
}
189189
};
190190

191+
let _span =
192+
tracing::info_span!("current_block_stream", block = block_info.number).entered();
193+
191194
update_current_block_metrics(block_info.number);
192195

193196
// If the block is exactly the same as the previous one, ignore it.
@@ -217,9 +220,7 @@ pub async fn current_block_ws_stream(
217220

218221
// If we reach here, the stream ended permanently
219222
tracing::error!("block stream ended after max reconnection attempts");
220-
};
221-
222-
tokio::task::spawn(update_future.instrument(tracing::info_span!("current_block_stream")));
223+
});
223224
Ok(receiver)
224225
}
225226

crates/shared/src/account_balances/cached.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -116,42 +116,47 @@ impl Balances {
116116
let cache = self.balance_cache.clone();
117117
let mut stream = into_stream(block_stream);
118118

119-
let task = async move {
119+
tokio::spawn(async move {
120120
while let Some(block) = stream.next().await {
121-
let balances_to_update = {
121+
let span = tracing::info_span!("balance_cache", block = block.number);
122+
async {
123+
let balances_to_update = {
124+
let mut cache = cache.lock().unwrap();
125+
cache.last_seen_block = block.number;
126+
cache
127+
.data
128+
.iter()
129+
.filter_map(|(query, entry)| {
130+
// Only update balances that have been requested recently.
131+
let oldest_allowed_request =
132+
cache.last_seen_block.saturating_sub(EVICTION_TIME);
133+
(entry.requested_at >= oldest_allowed_request)
134+
.then_some(query.clone())
135+
})
136+
.collect_vec()
137+
};
138+
139+
let results = inner.get_balances(&balances_to_update).await;
140+
122141
let mut cache = cache.lock().unwrap();
123-
cache.last_seen_block = block.number;
124-
cache
125-
.data
126-
.iter()
127-
.filter_map(|(query, entry)| {
128-
// Only update balances that have been requested recently.
129-
let oldest_allowed_request =
130-
cache.last_seen_block.saturating_sub(EVICTION_TIME);
131-
(entry.requested_at >= oldest_allowed_request).then_some(query.clone())
132-
})
133-
.collect_vec()
134-
};
135-
136-
let results = inner.get_balances(&balances_to_update).await;
137-
138-
let mut cache = cache.lock().unwrap();
139-
balances_to_update
140-
.into_iter()
141-
.zip(results)
142-
.for_each(|(query, result)| {
143-
if let Ok(balance) = result {
144-
cache.update_balance(&query, balance, block.number);
145-
}
142+
balances_to_update
143+
.into_iter()
144+
.zip(results)
145+
.for_each(|(query, result)| {
146+
if let Ok(balance) = result {
147+
cache.update_balance(&query, balance, block.number);
148+
}
149+
});
150+
cache.data.retain(|_, value| {
151+
// Only keep balances where we know we have the most recent data.
152+
value.updated_at >= block.number
146153
});
147-
cache.data.retain(|_, value| {
148-
// Only keep balances where we know we have the most recent data.
149-
value.updated_at >= block.number
150-
});
154+
}
155+
.instrument(span)
156+
.await;
151157
}
152158
tracing::error!("block stream terminated unexpectedly");
153-
};
154-
tokio::spawn(task.instrument(tracing::info_span!("balance_cache")));
159+
});
155160
}
156161
}
157162

crates/shared/src/recent_block_cache.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,21 +200,28 @@ where
200200
block_stream: CurrentBlockWatcher,
201201
label: String,
202202
) {
203-
tokio::task::spawn(
204-
async move {
205-
let mut stream = ethrpc::block_stream::into_stream(block_stream);
206-
while let Some(block) = stream.next().await {
203+
tokio::task::spawn(async move {
204+
let mut stream = ethrpc::block_stream::into_stream(block_stream);
205+
while let Some(block) = stream.next().await {
206+
let span =
207+
tracing::info_span!("cache_maintenance", cache = %label, block = block.number);
208+
let should_break = async {
207209
let Some(inner) = inner.upgrade() else {
208210
tracing::debug!("cache no longer in use; terminate GC task");
209-
break;
211+
return true;
210212
};
211213
if let Err(err) = inner.update_cache_at_block(block.number).await {
212214
tracing::warn!(?err, "failed to update cache");
213215
}
216+
false
217+
}
218+
.instrument(span)
219+
.await;
220+
if should_break {
221+
break;
214222
}
215223
}
216-
.instrument(tracing::info_span!("cache_maintenance", cache = label)),
217-
);
224+
});
218225
}
219226
}
220227

0 commit comments

Comments
 (0)