@@ -39,17 +39,19 @@ use serial_test::serial;
39
39
use url:: Url ;
40
40
41
41
mod local {
42
+ use super :: * ;
42
43
use datafusion:: datasource:: source:: DataSourceExec ;
43
44
use datafusion:: { common:: stats:: Precision , datasource:: provider_as_source} ;
44
45
use datafusion_common:: tree_node:: { TreeNode , TreeNodeRecursion } ;
45
- use datafusion_expr:: { LogicalPlan , LogicalPlanBuilder , TableProviderFilterPushDown , TableScan } ;
46
+ use datafusion_expr:: {
47
+ LogicalPlan , LogicalPlanBuilder , TableProviderFilterPushDown , TableScan ,
48
+ } ;
46
49
use deltalake_core:: {
47
50
delta_datafusion:: DeltaLogicalCodec , logstore:: default_logstore, writer:: JsonWriter ,
48
51
} ;
49
52
use itertools:: Itertools ;
50
53
use object_store:: local:: LocalFileSystem ;
51
54
use TableProviderFilterPushDown :: { Exact , Inexact } ;
52
- use super :: * ;
53
55
#[ tokio:: test]
54
56
#[ serial]
55
57
async fn test_datafusion_local ( ) -> TestResult {
@@ -210,26 +212,10 @@ mod local {
210
212
}
211
213
}
212
214
213
- fn find_table_scan ( plan : & LogicalPlan ) -> Vec < & Expr > {
214
- let mut result = vec ! [ ] ;
215
-
216
- plan. apply ( |node| {
217
- if let LogicalPlan :: TableScan ( TableScan { ref filters, ..} ) = node {
218
- result = filters. iter ( ) . collect ( ) ;
219
- Ok ( TreeNodeRecursion :: Stop ) // Stop traversal once found
220
- } else {
221
- Ok ( TreeNodeRecursion :: Continue ) // Continue traversal
222
- }
223
- } ) . expect ( "Traversal should not fail" ) ;
224
-
225
- result
226
- }
227
-
228
215
#[ tokio:: test]
229
216
async fn test_datafusion_query_partitioned_pushdown ( ) -> Result < ( ) > {
230
217
let ctx = SessionContext :: new ( ) ;
231
- let table = open_table ( "../test/tests/data/delta-0.8.0-partitioned" )
232
- . await ?;
218
+ let table = open_table ( "../test/tests/data/delta-0.8.0-partitioned" ) . await ?;
233
219
ctx. register_table ( "demo" , Arc :: new ( table. clone ( ) ) ) ?;
234
220
235
221
let pruning_predicates = [
@@ -238,30 +224,40 @@ mod local {
238
224
PruningTestCase :: new ( "year = '2021'" ) ,
239
225
PruningTestCase :: with_push_down (
240
226
"year = '2021' AND day IS NOT NULL" ,
241
- vec ! [ Exact , Exact ]
227
+ vec ! [ Exact , Exact ] ,
242
228
) ,
243
229
PruningTestCase :: new ( "year IN ('2021', '2022')" ) ,
244
230
// NOT IN (a, b) is rewritten as (col != a AND col != b)
245
- PruningTestCase :: with_push_down (
246
- "year NOT IN ('2020', '2022')" ,
247
- vec ! [ Exact , Exact ]
248
- ) ,
231
+ PruningTestCase :: with_push_down ( "year NOT IN ('2020', '2022')" , vec ! [ Exact , Exact ] ) ,
249
232
// BETWEEN a AND b is rewritten as (col >= a AND col < b)
250
- PruningTestCase :: with_push_down (
251
- "year BETWEEN '2021' AND '2022'" ,
252
- vec ! [ Exact , Exact ]
253
- ) ,
233
+ PruningTestCase :: with_push_down ( "year BETWEEN '2021' AND '2022'" , vec ! [ Exact , Exact ] ) ,
254
234
PruningTestCase :: new ( "year NOT BETWEEN '2019' AND '2020'" ) ,
255
235
PruningTestCase :: with_push_down (
256
236
"year = '2021' AND day IN ('4', '5', '20')" ,
257
- vec ! [ Exact , Exact ]
237
+ vec ! [ Exact , Exact ] ,
258
238
) ,
259
239
PruningTestCase :: with_push_down (
260
240
"year = '2021' AND cast(day as int) <= 20" ,
261
- vec ! [ Exact , Inexact ]
241
+ vec ! [ Exact , Inexact ] ,
262
242
) ,
263
243
] ;
264
244
245
+ fn find_scan_filters ( plan : & LogicalPlan ) -> Vec < & Expr > {
246
+ let mut result = vec ! [ ] ;
247
+
248
+ plan. apply ( |node| {
249
+ if let LogicalPlan :: TableScan ( TableScan { ref filters, .. } ) = node {
250
+ result = filters. iter ( ) . collect ( ) ;
251
+ Ok ( TreeNodeRecursion :: Stop ) // Stop traversal once found
252
+ } else {
253
+ Ok ( TreeNodeRecursion :: Continue ) // Continue traversal
254
+ }
255
+ } )
256
+ . expect ( "Traversal should not fail" ) ;
257
+
258
+ result
259
+ }
260
+
265
261
for pp in pruning_predicates {
266
262
let pred = pp. sql ;
267
263
let sql = format ! ( "SELECT CAST( day as int ) as my_day FROM demo WHERE {pred} ORDER BY CAST( day as int ) ASC" ) ;
@@ -271,14 +267,12 @@ mod local {
271
267
272
268
// validate that we are correctly qualifying filters as Exact or Inexact
273
269
let plan = df. clone ( ) . into_optimized_plan ( ) ?;
274
- let filters = find_table_scan ( & plan) ;
270
+ let filters = find_scan_filters ( & plan) ;
275
271
let push_down = table. supports_filters_pushdown ( & filters) ?;
276
272
277
273
assert_eq ! ( push_down, pp. push_down) ;
278
274
279
- let batches = df
280
- . collect ( )
281
- . await ?;
275
+ let batches = df. collect ( ) . await ?;
282
276
283
277
let batch = & batches[ 0 ] ;
284
278
0 commit comments