Skip to content

Commit 224c2d5

Browse files
authored
fix(objectstore): Retry attachment downloads (#1946)
Ideally we'd be using the download service directly (as noted in the comment), there is an attempt in #1928 but requires a bit more thought and work. Right now we do see some download failures which can be mitigated with retries. So while it's not great and makes the `GenericErrorHandler` public with a helper fn, it's what we got.
1 parent 9d49754 commit 224c2d5

2 files changed

Lines changed: 42 additions & 30 deletions

File tree

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
use std::fs::File;
2-
use std::pin::pin;
32

43
use futures::TryStreamExt;
5-
use symbolicator_service::download::DownloadService;
4+
use symbolicator_service::caching::CacheError;
5+
use symbolicator_service::download::{self, DownloadService};
66
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
7-
use tokio_util::io::StreamReader;
87

98
use crate::interface::AttachmentFile;
109

1110
pub async fn download_attachment(
1211
download_svc: &DownloadService,
1312
file: AttachmentFile,
14-
) -> anyhow::Result<File> {
13+
) -> Result<File, CacheError> {
1514
let (storage_url, storage_token) = match file {
1615
AttachmentFile::Local(file) => return Ok(file),
1716
AttachmentFile::Remote {
@@ -25,26 +24,32 @@ pub async fn download_attachment(
2524
// download files in multiple chunks concurrently, but I don’t think our `objecstore` server currently
2625
// supports range requests, and those would also mess with streaming decompression.
2726
// Not to mention that using the `DownloadService` is not that straight forward.
28-
let mut request = download_svc.trusted_client.get(storage_url);
29-
if let Some(token) = storage_token {
30-
request = request.bearer_auth(token);
31-
}
32-
let stream = request
33-
.send()
34-
.await?
35-
.error_for_status()?
36-
.bytes_stream()
37-
.map_err(std::io::Error::other);
38-
let mut reader = pin!(StreamReader::new(stream));
39-
40-
let file = tempfile::tempfile()?;
41-
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
42-
tokio::io::copy(&mut reader, &mut writer).await?;
43-
writer.flush().await?;
44-
let mut file = writer.into_inner();
45-
file.sync_data().await?;
46-
47-
file.rewind().await?;
48-
49-
Ok(file.into_std().await)
27+
download::retry(|| async {
28+
let mut request = download_svc.trusted_client.get(&storage_url);
29+
if let Some(token) = storage_token.as_ref() {
30+
request = request.bearer_auth(token);
31+
}
32+
let response = request.send().await?;
33+
if !response.status().is_success() {
34+
return Err(
35+
download::GenericErrorHandler::handle_response(&storage_url, response).await,
36+
);
37+
}
38+
39+
let mut stream = response.bytes_stream();
40+
41+
let file = tempfile::tempfile()?;
42+
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
43+
while let Some(chunk) = stream.try_next().await? {
44+
writer.write_all(&chunk).await?;
45+
}
46+
writer.flush().await?;
47+
let mut file = writer.into_inner();
48+
file.sync_data().await?;
49+
50+
file.rewind().await?;
51+
52+
Ok(file.into_std().await)
53+
})
54+
.await
5055
}

crates/symbolicator-service/src/download/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -633,14 +633,15 @@ async fn do_download_reqwest(
633633
///
634634
/// This error handler uses the HTTP status code to infer the [`CacheError`],
635635
/// this works for any HTTP request, but does not consider API specific responses.
636-
struct GenericErrorHandler;
636+
pub struct GenericErrorHandler;
637637

638-
impl ErrorHandler for GenericErrorHandler {
639-
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
638+
impl GenericErrorHandler {
639+
/// Converts an unsuccessful HTTP response to a [`CacheError`].
640+
pub async fn handle_response(source: &str, response: reqwest::Response) -> CacheError {
640641
let status = response.status();
641642
debug_assert!(!status.is_success());
642643

643-
if let Ok(details) = response.response.text().await {
644+
if let Ok(details) = response.text().await {
644645
::sentry::configure_scope(|scope| {
645646
scope.set_extra(
646647
"reqwest_response_body",
@@ -677,6 +678,12 @@ impl ErrorHandler for GenericErrorHandler {
677678
}
678679
}
679680

681+
impl ErrorHandler for GenericErrorHandler {
682+
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
683+
Self::handle_response(source, response.response).await
684+
}
685+
}
686+
680687
/// A HTTP request Symbolicator wants to make.
681688
struct SymRequest<'a> {
682689
source_name: &'a str,

0 commit comments

Comments
 (0)