Skip to content

Commit cccecf1

Browse files
committed
update
1 parent 2e9d733 commit cccecf1

File tree

1 file changed

+11
-24
lines changed

1 file changed

+11
-24
lines changed

datafusion-examples/examples/thread_pools.rs

+11-24
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ async fn main() -> Result<()> {
6262
// `await` here, so the the `async` function still runs on the current runtime.
6363
// We use the `DedicatedExecutor` to run the query on a different runtime.
6464
different_runtime_basic(ctx, sql).await?;
65+
66+
// Run the same query on a different runtime including remote IO
67+
different_runtime_advanced(ctx, sql).await?;
68+
6569
Ok(())
6670
}
6771

@@ -129,35 +133,18 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()>
129133
// and the `await` simply notifies when the work is done that the work is done
130134
.await??;
131135

136+
// When done with a DedicatedExecutor, it should be shut down cleanly to give
137+
// any outstanding tasks a chance to clean up
138+
dedicated_executor.join().await;
139+
132140
Ok(())
133141
}
134142

143+
/// Demonstrates how to run queries on a **different** runtime than the current one but
144+
/// run IO operations on the current runtime
145+
135146
/// this is run on a new, separate runtime
136147
async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> {
137-
// Setup execution as before
138-
let df = ctx.sql(&sql).await?;
139-
140-
let mut stream = df.execute_stream().await?;
141-
142-
//XXX Note at this point, calling next() will be run on our new threadpool. However, this will also spawn the catalog and object store requests on the same threadpool as well!
143-
144-
// While this will mean we don't interfere with handling of other network requests, it will mean tht the network requests that happen as part of query processing will still be running on the same threadpool
145-
146-
// TODO show this working
147-
148-
// To avoid this, all IO access, both catalog and data (e.g. object_store) must be spawned on to their own runtime, like this:
149-
// TODO....
150-
151-
//
152-
// care is required to avoid calling `next()` (aka polling) from the default IO thread (even if planning / execution is run on that other thread)
153-
// Best practice is to do all of DataFusion planning / execution on a separate pool. Note that some care is required for remote catalogs such as iceberg that
154-
// themselves do network IO
155-
// TODO figure out how to cause an erorr due to io thread
156-
157-
while let Some(batch) = stream.next().await {
158-
println!("{}", pretty_format_batches(&[batch?]).unwrap());
159-
}
160-
161148
//
162149
// You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.
163150

0 commit comments

Comments
 (0)