Description
Describe the bug
Related to
- Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work) #12393
- Example for using a separate threadpool for CPU bound work (try 2) #14286
to_pyarrow_table()
on a table in S3 kept getting "Generic S3 error: error decoding response body" delta-io/delta-rs#2595
Basically, when I just try to read one of the ClickBench parquet files directly from remote object store (see example below) on a slow internet connection I get the following error
called
Result::unwrap()
on anErr
value: ArrowError(ExternalError(External(Generic { store: "HTTP", source: reqwest::Error { kind: Decode, source: reqwest::Error { kind: Body, source: TimedOut } } })), None)
My example just reads the data back (it is not doing any CPU intensive processing).
This is very similar to the reports @ion-elgreco has fielded in delta.rs
To Reproduce
Run this program that just tries to read the file (on a crappy internet connection)
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
let object_store_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
let object_store = object_store::http::HttpBuilder::new()
.with_url(object_store_url.as_str())
.build()
.unwrap();
let max_request_size = 1*1024*1024; // 1MB
//let object_store = LimitedRequestSizeObjectStore::new(Arc::new(object_store), max_request_size);
//let object_store= ReportingObjectStore::new(Arc::new(object_store));
ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
let start = Instant::now();
let options = ParquetReadOptions::new();
let df = ctx.read_parquet(
"https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet",
options,
)
.await.unwrap();
println!("Created table and plan in {:?}", Instant::now() - start);
df.clone().explain(false, false).unwrap().show().await.unwrap();
println!("running query");
let start = Instant::now();
let mut stream = df.execute_stream().await.unwrap();
let mut total_rows = 0;
let mut total_batches = 0;
while let Some(batch) = stream.next().await {
let batch = batch.unwrap();
total_rows += batch.num_rows();
total_batches += 1;
println!("{:?} batches: {total_batches} rows: {total_rows}", Instant::now() - start);
}
println!("DOne");
Ok(())
}
This results in the following output:
running query
3.753874792s batches: 1 rows: 8192
3.756978125s batches: 2 rows: 16384
3.761664292s batches: 3 rows: 24576
3.765139625s batches: 4 rows: 32768
3.768803333s batches: 5 rows: 40960
3.771689042s batches: 6 rows: 49152
3.773665958s batches: 7 rows: 57344
3.774663333s batches: 8 rows: 62734
thread 'main' panicked at src/main.rs:59:27:
called `Result::unwrap()` on an `Err` value: ArrowError(ExternalError(External(Generic { store: "HTTP", source: reqwest::Error { kind: Decode, source: reqwest::Error { kind: Body, source: TimedOut } } })), None)
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Expected behavior
I expect the query to complete without error
Additional context
When I added an ObjectStore
wrapper that reported what requests were being
made to the underlying storage system I found that DataFusion is making single
"large" request for 93MB. Given the bandwidth of the coffee shop wifi, this
request can not be completed within the default 30 second connection timeout.
Thus the request times out and the error is returned to the client
I was able to make the query work by writing another ObjectStore wrapper that
split the single 93MB request into multiple smaller requests and then my program completes.
Click here to see the idea (horrible code, I am sorry)
use std::collections::VecDeque;
use std::fmt::Display;
use std::ops::Range;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use bytes::Bytes;
use futures_util::stream::BoxStream;
use object_store::{GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult};
use object_store::path::Path;
/// This is an ObjectStore wrapper that limits the sizes of individual requests to some size
///
/// This is mostly useful for connections that have limited bandwidth so that the requests complete
/// before a timeout is reached.
///
/// This requires more requests, but each request is smaller and more likely to
/// complete within the timeout
///
/// For example, if the timeout is 30 seconds and the bandwidth is 1MB/s,
/// a single request for 100MB would take around 100s,. exceeding the timeout.
///
#[derive(Debug)]
pub struct LimitedRequestSizeObjectStore {
inner: Arc<dyn ObjectStore>,
maximum_request_size: usize,
next_id: AtomicUsize,
}
impl Display for LimitedRequestSizeObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LimitedRequestSizeObjectStore({}, maximum_request_size={})", self.inner, self.maximum_request_size)
}
}
impl LimitedRequestSizeObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>, maximum_request_size: usize) -> Self {
// start at 1000000 to make it easy to distinguish from the other object stores
let next_id = AtomicUsize::new(1000000);
Self { inner, maximum_request_size, next_id }
}
/// Splits a Vec of ranges into a list of ranges so that each range is no larger than `maximum_request_size`
fn split_ranges(&self, ranges: &[Range<usize>]) -> Vec<Vec<Range<usize>>> {
println!("input split ranges {:?}", ranges);
let mut remaining_ranges = ranges.iter().cloned().collect::<VecDeque<_>>();
let mut split_ranges = vec![];
let mut current_ranges = vec![];
let mut remaining_bytes = self.maximum_request_size;
while let Some(range) = remaining_ranges.pop_front() {
// if there is space left in the current output range, take the whole range
if range.len() < remaining_bytes {
remaining_bytes -= range.len();
current_ranges.push(range);
}
// otherwise, need to split the range and put it back on for next time
else {
let start_range = range.start..range.start+remaining_bytes;
let end_range = range.start+remaining_bytes..range.end;
current_ranges.push(start_range);
remaining_ranges.push_front(end_range);
// current ranges is full, so add it to the output and start a new one
assert_eq!(current_ranges.iter().map(|r| r.len()).sum::<usize>(), self.maximum_request_size);
split_ranges.push(current_ranges);
current_ranges = vec![];
remaining_bytes = self.maximum_request_size;
}
}
if !current_ranges.is_empty() {
split_ranges.push(current_ranges);
}
println!("output split ranges {:?}", split_ranges);
split_ranges
}
}
#[async_trait::async_trait]
impl ObjectStore for LimitedRequestSizeObjectStore {
async fn put(&self, location: &Path, payload: PutPayload) -> object_store::Result<PutResult> {
self.inner.put(location, payload).await
}
async fn put_opts(&self, location: &Path, payload: PutPayload, opts: PutOptions) -> object_store::Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart(&self, location: &Path) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
async fn put_multipart_opts(&self, location: &Path, opts: PutMultipartOpts) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
println!("LimitedRequestSizeObjectStore begin get: {}", location);
let result = self.inner.get(location).await?;
println!("LimitedRequestSizeObjectStore get: Read {} bytes from {}", result.meta.size, result.meta.location);
Ok(result)
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> object_store::Result<GetResult> {
println!("LimitedRequestSizeObjectStore begin get: {} ({:?})", location, options);
let result = self.inner.get_opts(location, options).await?;
println!("LimitedRequestSizeObjectStore get: Read {} bytes from {}", result.meta.size, result.meta.location);
Ok(result)
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
let id = self.next_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let total_bytes = range.len();
let mbytes = total_bytes/ 1024 / 1024;
println!("LimitedRequestSizeObjectStore id={id} get_range: {} {mbytes}MB {total_bytes} bytes", location);
let mut res = self.get_ranges(location, &[range]).await?;
println!("LimitedRequestSizeObjectStore id={id} get_range complete");
assert_eq!(res.len(), 1);
Ok(res.pop().unwrap())
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> object_store::Result<Vec<Bytes>> {
let id = self.next_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let total_bytes: usize = ranges.iter().map(|r| r.len()).sum();
let mbytes = total_bytes/ 1024 / 1024;
println!("LimitedRequestSizeObjectStore id={id} get_ranges: {} {mbytes}MB {total_bytes} bytes", location);
let split_ranges = self.split_ranges(ranges);
println!(" Have {} split ranges", split_ranges.len());
// perform a request for each range and put them back together at the end
let mut overall_result = Vec::with_capacity(total_bytes);
for split_range in split_ranges {
let total_range_bytes: usize = split_range.iter().map(|r| r.len()).sum();
println!(" requesting {} inner ranges {total_range_bytes} bytes", split_range.len());
let responses = self.inner.get_ranges(location, &split_range).await?;
for response in responses {
overall_result.extend(response.as_ref());
}
}
assert_eq!(overall_result.len(), total_bytes);
// convert it to Bytes and slice it up into the original ranges
let overall_result = Bytes::from(overall_result);
let mut result_ranges = vec![];
let mut offeset = 0; //
for range in ranges {
let slice = overall_result.slice(offeset..offeset+range.len());
offeset += range.len();
result_ranges.push(slice);
}
println!("LimitedRequestSizeObjectStore id={id} get_ranges complete");
Ok(result_ranges)
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.inner.head(location).await
}
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}
fn delete_stream<'a>(&'a self, locations: BoxStream<'a, object_store::Result<Path>>) -> BoxStream<'a, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}
fn list_with_offset(&self, prefix: Option<&Path>, offset: &Path) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}