Skip to content

Commit de91326

Browse files
committed
Add example for using a separate threadpool for CPU bound work
1 parent e7f7a9b commit de91326

File tree

1 file changed

+153
-0
lines changed

1 file changed

+153
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::util::pretty::pretty_format_batches;
19+
use datafusion::error::Result;
20+
use datafusion::prelude::*;
21+
use futures::stream::StreamExt;
22+
23+
/// This example shows how to use a separate thread pool (tokio [`Runtime`])) to
24+
/// run DataFusion CPU intensive jobs.
25+
///
26+
/// If you just run a DataFusion plan without setting up a separate `Runtime`,
27+
/// it will execute CPU intensive jobs on the same thread pool as any I/O which
28+
/// can cause the issues described in the [Architecture section] such as
29+
/// bandwidth being throttled by network congestion control and increasing
30+
/// latencies for processing network messages.
31+
32+
/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it in
33+
/// a thread local variable as the "current" runtime (on which `async` futures,
34+
/// streams and tasks are run).
35+
///
36+
/// Basic programs do not need to delve into this level of detail, but for this
37+
/// example it is important to understand which [`Runtime`] is the "current" one.
38+
#[tokio::main]
39+
async fn main() -> Result<()> {
40+
let ctx = SessionContext::new()
41+
// enabling URL table means we can select directly from
42+
// paths in SQL queries.
43+
.enable_url_table();
44+
let sql = format!(
45+
"SELECT * FROM '{}/alltypes_plain.parquet'",
46+
datafusion::test_util::parquet_test_data()
47+
);
48+
49+
same_runtime(&ctx, &sql).await?;
50+
different_runtime(ctx, sql).await?;
51+
Ok(())
52+
}
53+
54+
/// Demonstrates how to run queries directly on the current tokio `Runtime`
55+
///
56+
/// This is how examples are shown in DataFusion and works well for development
57+
/// and local query processing.
58+
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
59+
// Note that calling .sql is an async function as it may also do network
60+
// I/O, for example to contact a remote catalog or do an object store LIST
61+
let df = ctx.sql(sql).await?;
62+
63+
// Many examples call `collect` or `show()` which buffers the results, but
64+
// internally DataFusion generates output a RecordBatch at a time
65+
66+
// Calling `execute_stream` on a DataFrame returns a
67+
// `SendableRecordBatchStream` which can then be incrementally polled
68+
let mut stream = df.execute_stream().await?;
69+
70+
// Calling `next()` drives the plan, producing new `RecordBatch`es using the
71+
// current runtime (and typically also the current thread).
72+
//
73+
// Note that executing the plan like this results in all CPU intensive work
74+
// is done on the same runtime that is used to do IO by default.
75+
while let Some(batch) = stream.next().await {
76+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
77+
}
78+
Ok(())
79+
}
80+
81+
/// Demonstrates how to run queries on a **different** runtime than the current one
82+
///
83+
/// This is typically how you should run DataFusion queries from a network
84+
/// server or when processing data from a remote object store.
85+
async fn different_runtime(ctx: SessionContext, sql: String) -> Result<()> {
86+
// First, we need a new runtime, which we can create with the tokio builder
87+
// however, since we are already in the context of another runtime
88+
// (installed by #[tokio::main]) we create a new thread for the runtime
89+
tokio::task::spawn_blocking(move || {
90+
std::thread::spawn(move || thread_entry(ctx, sql.to_string()))
91+
.join()
92+
.expect("thread did not panic")
93+
})
94+
.await
95+
.expect("task did not panic")
96+
}
97+
98+
/// This is the entry point of thread that we started our second runtime on
99+
fn thread_entry(ctx: SessionContext, sql: String) -> Result<()> {
100+
let runtime = tokio::runtime::Builder::new_multi_thread()
101+
// only enable the time driver (not the I/O driver), meaning this
102+
// runtime will not be able to perform network I/O
103+
.enable_time()
104+
.build()?;
105+
106+
// Now we can run the actual code we want on a different runtime
107+
runtime.block_on(async move { different_runtime_inner(ctx, sql).await })
108+
109+
}
110+
111+
/// this is run on a new, separate runtime
112+
async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> {
113+
// Setup execution as before
114+
let df = ctx.sql(&sql)
115+
.await?;
116+
117+
let mut stream = df.execute_stream().await?;
118+
119+
120+
//XXX Note at this point, calling next() will be run on our new threadpool. However, this will also spawn the catalog and object store requests on the same threadpool as well!
121+
122+
// While this will mean we don't interfere with handling of other network requests, it will mean tht the network requests that happen as part of query processing will still be running on the same threadpool
123+
124+
// TODO show this working
125+
126+
// To avoid this, all IO access, both catalog and data (e.g. object_store) must be spawned on to their own runtime, like this:
127+
// TODO....
128+
129+
130+
//
131+
// care is required to avoid calling `next()` (aka polling) from the default IO thread (even if planning / execution is run on that other thread)
132+
// Best practice is to do all of DataFusion planning / execution on a separate pool. Note that some care is required for remote catalogs such as iceberg that
133+
// themselves do network IO
134+
// TODO figure out how to cause an erorr due to io thread
135+
136+
while let Some(batch) = stream.next().await {
137+
println!("{}", pretty_format_batches(&[batch?]).unwrap());
138+
}
139+
140+
141+
142+
//
143+
// You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.
144+
145+
// Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.
146+
147+
// Using separate Runtime will avoid other requests being messed up but it won't really help requests made from DataSources such as
148+
// reading parquet files from object_store.
149+
//
150+
// Thus this runtime also disables IO so that we are sure there is no IO work being done on it.
151+
152+
Ok(())
153+
}

0 commit comments

Comments
 (0)