25
25
//! due to congestion control and increased latencies for processing network
26
26
//! messages.
27
27
28
+ use std:: sync:: Arc ;
28
29
use arrow:: util:: pretty:: pretty_format_batches;
29
- use datafusion:: common:: runtime:: dedicated_executor:: DedicatedExecutor ;
30
30
use datafusion:: common:: runtime:: DedicatedExecutorBuilder ;
31
31
use datafusion:: error:: Result ;
32
32
use datafusion:: execution:: SendableRecordBatchStream ;
33
33
use datafusion:: prelude:: * ;
34
34
use futures:: stream:: StreamExt ;
35
+ use object_store:: http:: HttpBuilder ;
36
+ use url:: Url ;
35
37
36
38
/// Normally, you don't need to worry about the details of the tokio runtime,
37
39
/// but for this example it is important to understand how the [`Runtime`]s work.
@@ -44,10 +46,9 @@ use futures::stream::StreamExt;
44
46
/// are run).
45
47
#[ tokio:: main]
46
48
async fn main ( ) -> Result < ( ) > {
47
- let ctx = SessionContext :: new ( )
48
- // enabling URL table means we can select directly from
49
- // paths in SQL queries.
50
- . enable_url_table ( ) ;
49
+ // The first two examples only do local file IO. Enable the URL table so we
50
+ // can select directly from filenames in SQL.
51
+ let ctx = SessionContext :: new ( ) . enable_url_table ( ) ;
51
52
let sql = format ! (
52
53
"SELECT * FROM '{}/alltypes_plain.parquet'" ,
53
54
datafusion:: test_util:: parquet_test_data( )
@@ -64,7 +65,7 @@ async fn main() -> Result<()> {
64
65
different_runtime_basic ( ctx, sql) . await ?;
65
66
66
67
// Run the same query on a different runtime including remote IO
67
- different_runtime_advanced ( ctx , sql ) . await ?;
68
+ different_runtime_advanced ( ) . await ?;
68
69
69
70
Ok ( ( ) )
70
71
}
@@ -140,13 +141,40 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()>
140
141
Ok ( ( ) )
141
142
}
142
143
143
- /// Demonstrates how to run queries on a **different** runtime than the current one but
144
- /// run IO operations on the current runtime
144
+ /// Demonstrates how to run queries on a different runtime than the current run
145
+ /// and how to handle IO operations.
146
+ ///
145
147
146
- /// this is run on a new, separate runtime
147
- async fn different_runtime_inner ( ctx : SessionContext , sql : String ) -> Result < ( ) > {
148
- //
149
- // You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.
148
+ async fn different_runtime_advanced ( ) -> Result < ( ) > {
149
+ // In this example, we will configure access to a remote object store
150
+ // over the network during the plan
151
+
152
+ let ctx = SessionContext :: new ( )
153
+ . enable_url_table ( ) ;
154
+
155
+ // setup http object store
156
+ let base_url = Url :: parse ( "https://github.com" ) . unwrap ( ) ;
157
+ let http_store = HttpBuilder :: new ( )
158
+ . with_url ( base_url. clone ( ) )
159
+ . build ( )
160
+ . unwrap ( ) ;
161
+ ctx. register_object_store ( & base_url, Arc :: new ( http_store) ) ;
162
+
163
+ // register csv file with the execution context
164
+ ctx. register_csv (
165
+ "aggregate_test_100" ,
166
+ "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv" ,
167
+ CsvReadOptions :: new ( ) ,
168
+ )
169
+ . await ?;
170
+
171
+ // execute the query
172
+ let df = ctx
173
+ . sql ( "SELECT c1,c2,c3 FROM aggregate_test_100 LIMIT 5" )
174
+ . await ?;
175
+
176
+ // print the results
177
+ df. show ( ) . await ?;
150
178
151
179
// Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.
152
180
0 commit comments