Skip to content

Commit 1bb1bf2

Browse files
committed
Add example for using a separate threadpool for CPU bound work
1 parent c0ca4b4 commit 1bb1bf2

File tree

6 files changed

+1757
-0
lines changed

6 files changed

+1757
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to
19+
//! run the CPU intensive parts of DataFusion plans.
20+
//!
21+
//! Running DataFusion plans that perform I/O, such as reading parquet files
22+
//! directly from remote object storage (e.g. AWS S3) without care will result
23+
//! in running CPU intensive jobs on the same thread pool, which can lead to the
24+
//! issues described in the [Architecture section] such as throttled bandwidth
25+
//! due to congestion control and increased latencies for processing network
26+
//! messages.
27+
use arrow::util::pretty::pretty_format_batches;
28+
use datafusion::common::runtime::dedicated_executor;
29+
use datafusion::error::Result;
30+
use datafusion::execution::SendableRecordBatchStream;
31+
use datafusion::physical_plan::DedicatedExecutor;
32+
use datafusion::prelude::*;
33+
use futures::stream::StreamExt;
34+
use object_store::http::HttpBuilder;
35+
use object_store::ObjectStore;
36+
use std::sync::Arc;
37+
use url::Url;
38+
39+
/// Normally, you don't need to worry about the details of the tokio runtime,
40+
/// but for this example it is important to understand how the [`Runtime`]s work.
41+
///
42+
/// There is a "current" runtime that is installed in a thread local variable
43+
/// that is used by the `tokio::spawn` function.
44+
///
45+
/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as
46+
/// as the "current" runtime (on which any `async` futures, streams and tasks
47+
/// are run).
48+
#[tokio::main]
49+
async fn main() -> Result<()> {
50+
// The first two examples only do local file IO. Enable the URL table so we
51+
// can select directly from filenames in SQL.
52+
let ctx = SessionContext::new().enable_url_table();
53+
let sql = format!(
54+
"SELECT * FROM '{}/alltypes_plain.parquet'",
55+
datafusion::test_util::parquet_test_data()
56+
);
57+
58+
// Run the same query on the same runtime. Note that calling `await` here
59+
// will effectively run the future (in this case the `async` function) on
60+
// the current runtime.
61+
same_runtime(&ctx, &sql).await?;
62+
63+
// Run the same query on a different runtime. Note that we are still calling
64+
// `await` here, so the the `async` function still runs on the current runtime.
65+
// We use the `DedicatedExecutor` to run the query on a different runtime.
66+
different_runtime_basic(ctx, sql).await?;
67+
68+
// Run the same query on a different runtime including remote IO
69+
different_runtime_advanced().await?;
70+
71+
Ok(())
72+
}
73+
74+
/// Run queries directly on the current tokio `Runtime`
75+
///
76+
/// This is now most examples in DataFusion are written and works well for
77+
/// development and local query processing.
78+
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
79+
// Calling .sql is an async function as it may also do network
80+
// I/O, for example to contact a remote catalog or do an object store LIST
81+
let df = ctx.sql(sql).await?;
82+
83+
// While many examples call `collect` or `show()`, those methods buffers the
84+
// results. internally DataFusion generates output a RecordBatch at a time
85+
86+
// Calling `execute_stream` on a DataFrame returns a
87+
// `SendableRecordBatchStream`. Depending on the plan, this may also do
88+
// network I/O, for example to begin reading a parquet file from a remote
89+
// object store as well. It is also possible that this function call spawns
90+
// tasks that begin doing CPU intensive work as well
91+
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
92+
93+
// Calling `next()` drives the plan, producing new `RecordBatch`es using the
94+
// current runtime (and typically also the current thread).
95+
//
96+
// Perhaps somewhat non obvious, calling the `next()` function often will
97+
// result in other tasks being spawned on the current runtime (e.g. for
98+
// `RepartitionExec` to read data from each of its input partitions in
99+
// parallel).
100+
//
101+
// Executing the plan like this results in all CPU intensive work
102+
// running on same (default) Runtime.
103+
while let Some(batch) = stream.next().await {
104+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
105+
}
106+
Ok(())
107+
}
108+
109+
/// Demonstrates how to run queries on a **different** runtime than the current one
110+
///
111+
/// See [`different_runtime_advanced`] to see how you should run DataFusion
112+
/// queries from a network server or when processing data from a remote object
113+
/// store.
114+
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> {
115+
// First, we need a new runtime, which we can create with the tokio builder
116+
// however, since we are already in the context of another runtime
117+
// (installed by #[tokio::main]) we create a new thread for the runtime
118+
let dedicated_executor = DedicatedExecutor::builder().build();
119+
120+
// Now, we can simply run the query on the new runtime
121+
dedicated_executor
122+
.spawn(async move {
123+
// this runs on the different threadpool
124+
let df = ctx.sql(&sql).await?;
125+
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
126+
127+
// Calling `next()` to drive the plan on the different threadpool
128+
while let Some(batch) = stream.next().await {
129+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
130+
}
131+
Ok(()) as Result<()>
132+
})
133+
// even though we are `await`ing here on the "current" pool, internally
134+
// the DedicatedExecutor runs the work on the separate threadpool pool
135+
// and the `await` simply notifies when the work is done that the work is done
136+
.await??;
137+
138+
// When done with a DedicatedExecutor, it should be shut down cleanly to give
139+
// any outstanding tasks a chance to clean up
140+
dedicated_executor.join().await;
141+
142+
Ok(())
143+
}
144+
145+
/// Demonstrates how to run queries on a different runtime than the current run
146+
/// and how to handle IO operations.
147+
async fn different_runtime_advanced() -> Result<()> {
148+
// In this example, we will configure access to a remote object store
149+
// over the network during the plan
150+
151+
let ctx = SessionContext::new().enable_url_table();
152+
153+
// setup http object store
154+
let base_url = Url::parse("https://github.com").unwrap();
155+
let http_store: Arc<dyn ObjectStore> =
156+
Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?);
157+
158+
let dedicated_executor = DedicatedExecutor::builder().build();
159+
160+
// By default, the object store will use the current runtime for IO operations
161+
// if we use a dedicated executor to run the plan, the eventual object store requests will also use the
162+
// dedicated executor's runtime
163+
//
164+
// To avoid this, we can wrap the object store to run on the "IO" runtime
165+
//
166+
// (if we don't do this the example fails with an error like
167+
//
168+
// ctx.register_object_store(&base_url, http_store);
169+
// A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.
170+
171+
let http_store = dedicated_executor.wrap_object_store(http_store);
172+
173+
// Tell datafusion about processing http:// urls with this wrapped object store
174+
ctx.register_object_store(&base_url, http_store);
175+
176+
// Plan (and execute) the query on the dedicated runtime
177+
let stream = dedicated_executor
178+
.spawn(async move {
179+
// Plan / execute the query
180+
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
181+
let df = ctx
182+
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
183+
.await?;
184+
let stream: SendableRecordBatchStream = df.execute_stream().await?;
185+
186+
Ok(stream) as Result<_>
187+
}).await??;
188+
189+
// We have now planned the query on the dedicated runtime, Yay! but we still need to
190+
// drive the stream (aka call `next()` to get the results).
191+
192+
// However, as mentioned above, calling `next()` resolves the Stream (and
193+
// any work it may do) on a thread in the current (default) runtime.
194+
//
195+
// To drive the stream on the dedicated runtime, we need to wrap it using a
196+
// `DedicatedExecutor::wrap_stream` stream function
197+
//
198+
// Note if you don't do this you will likely see a panic about `No IO runtime registered.`
199+
// because the threads in the current (main) tokio runtime have not had the IO runtime
200+
// installed
201+
let mut stream = dedicated_executor.run_sendable_record_batch_stream(stream);
202+
203+
// Note you can run other streams on the DedicatedExecutor as well using the
204+
// DedicatedExecutor:YYYXXX function. This is helpful for example, if you
205+
// need to do non trivial CPU work on the results of the stream (e.g.
206+
// calling a FlightDataEncoder to convert the results to flight to send it
207+
// over the network),
208+
209+
while let Some(batch) = stream.next().await {
210+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
211+
}
212+
213+
Ok(())
214+
}

datafusion/physical-plan/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ parking_lot = { workspace = true }
6666
pin-project-lite = "^0.2.7"
6767
rand = { workspace = true }
6868
tokio = { workspace = true }
69+
# todo figure out if we need to use tokio_stream / could use record batch receiver stream
70+
tokio-stream = {version = "0.1"}
71+
object_store = { workspace = true }
72+
6973

7074
[dev-dependencies]
7175
criterion = { version = "0.5", features = ["async_futures"] }

0 commit comments

Comments
 (0)