-
Notifications
You must be signed in to change notification settings - Fork 185
Expand file tree
/
Copy pathasync_read.rs
More file actions
301 lines (256 loc) · 11.5 KB
/
async_read.rs
File metadata and controls
301 lines (256 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
//! Functions that enable extracting or streaming a Conda package for objects
//! that implement the [`tokio::io::AsyncRead`] trait.
use std::path::Path;
use async_compression::tokio::bufread::BzDecoder;
use async_spooled_tempfile::SpooledTempFile;
use async_zip::base::read::stream::ZipFileReader;
use tokio::io::{AsyncRead, AsyncSeekExt};
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
#[cfg(feature = "reqwest")]
use {futures_util::StreamExt as _, tokio::io::AsyncReadExt};
use crate::{read::SizeCountingReader, ExtractError, ExtractResult};
use super::shared::{extract_tar_zst_entry, unpack_tar_archive, DEFAULT_BUF_SIZE};
/// Extracts the contents a `.tar.bz2` package archive using fully async implementation.
pub async fn extract_tar_bz2(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
// Ensure the destination directory exists
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
// Clone destination for the async block
let destination = destination.to_owned();
// Wrap the reading in additional readers that will compute the hashes while extracting
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
// Create a buffered reader for better performance
let buf_reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, &mut size_reader);
// Decompress bzip2 asynchronously
let decoder = BzDecoder::new(buf_reader);
// Build archive with optimized settings for faster extraction:
// - Skip mtime preservation to avoid extra syscalls
// - Skip automatic permission handling (we'll set executable bits manually)
// - Skip extended attributes for better performance
let archive = tokio_tar::ArchiveBuilder::new(decoder)
.set_preserve_mtime(true)
.set_preserve_permissions(false)
.set_unpack_xattrs(false)
.set_allow_external_symlinks(false)
.build();
// Unpack entries manually, preserving only executable bits on Unix
unpack_tar_archive(archive, &destination).await?;
// Read the file to the end to make sure the hash is properly computed
tokio::io::copy(&mut size_reader, &mut tokio::io::sink())
.await
.map_err(ExtractError::IoError)?;
// Get the size and hashes
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
// Validate that we actually read some data from the stream.
// If total_size is 0, it likely means the stream was truncated or the bzip2
// decompressor silently failed without detecting an incomplete stream.
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
/// Extracts the contents of a `.conda` package archive using fully async implementation.
/// This will perform on-the-fly decompression by streaming the reader.
pub async fn extract_conda(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
// Ensure the destination directory exists
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
// Clone destination for the async block
let destination = destination.to_owned();
// Wrap the reading in additional readers that will compute the hashes while extracting
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
// Convert to futures traits and create a buffered reader (async_zip uses futures traits)
let compat_reader = (&mut size_reader).compat();
let mut buf_reader = futures::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, compat_reader);
// Create a ZIP reader for streaming
let mut zip_reader = ZipFileReader::new(&mut buf_reader);
// Process each ZIP entry
while let Some(mut entry) = zip_reader
.next_with_entry()
.await
.map_err(|e| ExtractError::IoError(std::io::Error::other(e)))?
{
let filename = entry.reader().entry().filename().as_str().map_err(|e| {
ExtractError::IoError(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
})?;
// Only extract .tar.zst files
if filename.ends_with(".tar.zst") {
// Get a reader for the entry and convert from futures traits to tokio traits
let mut compat_entry = entry.reader_mut().compat();
extract_tar_zst_entry(&mut compat_entry, &destination).await?;
}
// Skip to the next entry (required by async_zip API)
(.., zip_reader) = entry
.skip()
.await
.map_err(|e| ExtractError::IoError(std::io::Error::other(e)))?;
}
// Read any remaining data to ensure hash is properly computed
// Use futures copy since we're already in futures ecosystem
futures::io::copy(&mut buf_reader, &mut futures::io::sink())
.await
.map_err(ExtractError::IoError)?;
// Get the size and hashes
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
// Validate that we actually read some data from the stream
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
/// Extracts the contents of a .conda package archive by fully reading the
/// stream and then decompressing. This is a fallback method for when streaming fails.
///
/// This implementation uses a `SpooledTempFile` (5MB in-memory threshold) to buffer
/// the package data, then uses the seek-based ZIP API for efficient extraction.
pub async fn extract_conda_via_buffering(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
// Delete destination first if it exists, as this method is usually used as a fallback
if tokio::fs::try_exists(destination)
.await
.map_err(ExtractError::IoError)?
{
tokio::fs::remove_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
}
// Ensure the destination directory exists
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
// Clone destination for the async block
let destination = destination.to_owned();
// Wrap the reading in additional readers that will compute the hashes while extracting
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
// Create a SpooledTempFile (uses memory up to 5MB, then switches to disk)
let mut spooled_file = SpooledTempFile::new(5 * 1024 * 1024);
// Copy from reader to spooled file while computing hashes
tokio::io::copy(&mut size_reader, &mut spooled_file)
.await
.map_err(ExtractError::IoError)?;
// Get the size and hashes now that we've read everything
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
// Validate that we actually read some data from the stream
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
// Rewind the spooled file to the beginning
spooled_file.rewind().await.map_err(ExtractError::IoError)?;
// Use the seek-based extraction (doesn't recompute hashes, we already have them)
crate::tokio::async_seek::extract_conda(spooled_file, &destination).await?;
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
/// Returns the ZIP entry prefix (`"info-"` or `"pkg-"`) that contains the
/// given `target_path` inside a `.conda` archive.
#[cfg(feature = "reqwest")]
pub(crate) fn conda_entry_prefix(target_path: &Path) -> &'static str {
if target_path.starts_with("info") {
"info-"
} else {
"pkg-"
}
}
/// Async equivalent of [`crate::seek::get_file_from_archive`].
///
/// Iterates entries of a tar archive, returning the contents of the first
/// entry whose path matches `file_name`. Because the reader is streaming,
/// only the bytes up to (and including) the target entry are consumed.
#[cfg(feature = "reqwest")]
pub(crate) async fn get_file_from_tar_archive<R: tokio::io::AsyncRead + Unpin>(
archive: &mut tokio_tar::Archive<R>,
file_name: &Path,
) -> Result<Option<Vec<u8>>, ExtractError> {
let mut entries = archive.entries().map_err(ExtractError::IoError)?;
while let Some(entry) = entries.next().await {
let mut entry = entry.map_err(ExtractError::IoError)?;
let path = entry.path().map_err(ExtractError::IoError)?;
if path.as_ref() == file_name {
let size = entry.header().size().map_err(ExtractError::IoError)?;
let mut buf = Vec::with_capacity(size as usize);
entry
.read_to_end(&mut buf)
.await
.map_err(ExtractError::IoError)?;
return Ok(Some(buf));
}
}
Ok(None)
}
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::conda_entry_prefix;
use std::path::Path;
#[test]
fn test_conda_entry_prefix_info_files() {
assert_eq!(conda_entry_prefix(Path::new("info/index.json")), "info-");
assert_eq!(conda_entry_prefix(Path::new("info/about.json")), "info-");
assert_eq!(conda_entry_prefix(Path::new("info/paths.json")), "info-");
assert_eq!(
conda_entry_prefix(Path::new("info/nested/deep/file.txt")),
"info-"
);
}
#[test]
fn test_conda_entry_prefix_pkg_files() {
assert_eq!(conda_entry_prefix(Path::new("lib/libz.so")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("bin/python")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("clobber")), "pkg-");
}
#[test]
fn test_conda_entry_prefix_info_bare() {
// Path::starts_with works on components, so "info" matches "info/"
assert_eq!(conda_entry_prefix(Path::new("info")), "info-");
}
#[test]
fn test_conda_entry_prefix_info_like_but_not_info_dir() {
// Paths that textually start with "info" but are not under the info/ directory
assert_eq!(conda_entry_prefix(Path::new("info-custom.txt")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("information/file")), "pkg-");
}
}