@@ -149,39 +149,59 @@ async fn different_runtime_advanced() -> Result<()> {
149
149
// In this example, we will configure access to a remote object store
150
150
// over the network during the plan
151
151
152
- let ctx = SessionContext :: new ( )
153
- . enable_url_table ( ) ;
152
+ let ctx = SessionContext :: new ( ) . enable_url_table ( ) ;
154
153
155
154
// setup http object store
156
155
let base_url = Url :: parse ( "https://github.com" ) . unwrap ( ) ;
157
- let http_store = HttpBuilder :: new ( )
158
- . with_url ( base_url. clone ( ) )
159
- . build ( )
160
- . unwrap ( ) ;
156
+ let http_store = HttpBuilder :: new ( ) . with_url ( base_url. clone ( ) ) . build ( ) ?;
157
+
158
+ // By default, the object store will use the current runtime for IO operations
159
+ // if we use a dedicated executor, object store requests will also use the
160
+ // dedicated executor's runtime
161
+ // This, wrap this object store in XXX that uses the "IO" runtime to do the IO
162
+ // (if we don't do this the example fails with an error like
163
+ //
164
+ // ctx.register_object_store(&base_url, Arc::new(http_store));
165
+ // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.
161
166
ctx. register_object_store ( & base_url, Arc :: new ( http_store) ) ;
162
167
163
- // register csv file with the execution context
164
- ctx. register_csv (
165
- "aggregate_test_100" ,
166
- "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv" ,
167
- CsvReadOptions :: new ( ) ,
168
- )
169
- . await ?;
168
+ let dedicated_executor = DedicatedExecutorBuilder :: new ( ) . build ( ) ;
170
169
171
- // execute the query
172
- let df = ctx
173
- . sql ( "SELECT c1,c2,c3 FROM aggregate_test_100 LIMIT 5" )
174
- . await ?;
170
+ // Plan (and execute) the query on the dedicated runtime
171
+ let mut stream = dedicated_executor
172
+ . spawn ( async move {
173
+ // Plan / execute the query
174
+ let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv" ;
175
+ let df = ctx
176
+ . sql ( & format ! ( "SELECT c1,c2,c3 FROM '{url}' LIMIT 5" ) )
177
+ . await ?;
178
+ let stream: SendableRecordBatchStream = df. execute_stream ( ) . await ?;
175
179
176
- // print the results
177
- df . show ( ) . await ?;
180
+ Ok ( stream ) as Result < SendableRecordBatchStream >
181
+ } ) . await ? ?;
178
182
179
- // Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.
183
+ // We have now planned the query on the dedicated runtime, but we still need to
184
+ // drive the stream (aka call `next()` to get the results.
180
185
181
- // Using separate Runtime will avoid other requests being messed up but it won't really help requests made from DataSources such as
182
- // reading parquet files from object_store.
183
- //
184
- // Thus this runtime also disables IO so that we are sure there is no IO work being done on it.
186
+
187
+ // as mentioned above, calling `next()` (including indirectly by using
188
+ // FlightDataEncoder to convert the results to flight to send it over the
189
+ // network), will *still* result in the CPU work (and a bunch of spawned
190
+ // tasks) being done on the runtime calling next() (aka the current runtime)
191
+ // and not on the dedicated runtime.
192
+
193
+ // to drive the stream on the dedicated runtime, we need to wrap it using a XXX stream function
194
+
195
+ while let Some ( batch) = stream. next ( ) . await {
196
+ println ! ( "{}" , pretty_format_batches( & [ batch?] ) . unwrap( ) ) ;
197
+ }
185
198
186
199
Ok ( ( ) )
187
200
}
201
+
202
+
203
+
204
+
205
+
206
+ // TODO move this into the common runtime crate
207
+
0 commit comments