15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
+ use arrow:: array:: { ArrayRef , Int32Array , RecordBatch , StringArray } ;
18
19
use datafusion:: arrow:: datatypes:: { DataType , Field , Schema } ;
19
20
use datafusion:: error:: Result ;
20
21
use datafusion:: prelude:: * ;
21
22
use std:: fs:: File ;
22
23
use std:: io:: Write ;
24
+ use std:: sync:: Arc ;
23
25
use tempfile:: tempdir;
24
26
25
- /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
26
- /// fetching results, using the DataFrame trait
27
+ /// This example demonstrates using DataFusion's DataFrame API to
28
+ ///
29
+ /// * [read_parquet]: execute queries against parquet files
30
+ /// * [read_csv]: execute queries against csv files
31
+ /// * [read_memory]: execute queries against in-memory arrow data
27
32
#[ tokio:: main]
28
33
async fn main ( ) -> Result < ( ) > {
29
- // create local execution context
34
+ // The SessionContext is the main high level API for interacting with DataFusion
30
35
let ctx = SessionContext :: new ( ) ;
36
+ read_parquet ( & ctx) . await ?;
37
+ read_csv ( & ctx) . await ?;
38
+ read_memory ( & ctx) . await ?;
39
+ Ok ( ( ) )
40
+ }
31
41
42
+ /// Use DataFrame API to
43
+ /// 1. Read parquet files,
44
+ /// 2. Show the schema
45
+ /// 3. Select columns and rows
46
+ async fn read_parquet ( ctx : & SessionContext ) -> Result < ( ) > {
47
+ // Find the local path of "alltypes_plain.parquet"
32
48
let testdata = datafusion:: test_util:: parquet_test_data ( ) ;
33
-
34
49
let filename = & format ! ( "{testdata}/alltypes_plain.parquet" ) ;
35
50
36
- // define the query using the DataFrame trait
37
- let df = ctx
38
- . read_parquet ( filename, ParquetReadOptions :: default ( ) )
39
- . await ?
40
- . select_columns ( & [ "id" , "bool_col" , "timestamp_col" ] ) ?
41
- . filter ( col ( "id" ) . gt ( lit ( 1 ) ) ) ?;
42
-
43
- // print the results
44
- df. show ( ) . await ?;
45
-
46
- // create a csv file waiting to be written
47
- let dir = tempdir ( ) ?;
48
- let file_path = dir. path ( ) . join ( "example.csv" ) ;
49
- let file = File :: create ( & file_path) ?;
50
- write_csv_file ( file) ;
51
-
52
- // Reading CSV file with inferred schema example
53
- let csv_df =
54
- example_read_csv_file_with_inferred_schema ( file_path. to_str ( ) . unwrap ( ) ) . await ;
55
- csv_df. show ( ) . await ?;
56
-
57
- // Reading CSV file with defined schema
58
- let csv_df = example_read_csv_file_with_schema ( file_path. to_str ( ) . unwrap ( ) ) . await ;
59
- csv_df. show ( ) . await ?;
60
-
61
- // Reading PARQUET file and print describe
51
+ // Read the parquet files and show its schema using 'describe'
62
52
let parquet_df = ctx
63
53
. read_parquet ( filename, ParquetReadOptions :: default ( ) )
64
54
. await ?;
65
- parquet_df. describe ( ) . await . unwrap ( ) . show ( ) . await ?;
66
55
67
- let dyn_ctx = ctx. enable_url_table ( ) ;
68
- let df = dyn_ctx
69
- . sql ( & format ! ( "SELECT * FROM '{}'" , file_path. to_str( ) . unwrap( ) ) )
56
+ // show its schema using 'describe'
57
+ parquet_df. clone ( ) . describe ( ) . await ?. show ( ) . await ?;
58
+
59
+ // Select three columns and filter the results
60
+ // so that only rows where id > 1 are returned
61
+ parquet_df
62
+ . select_columns ( & [ "id" , "bool_col" , "timestamp_col" ] ) ?
63
+ . filter ( col ( "id" ) . gt ( lit ( 1 ) ) ) ?
64
+ . show ( )
70
65
. await ?;
71
- df. show ( ) . await ?;
72
66
73
67
Ok ( ( ) )
74
68
}
75
69
76
- // Function to create an test CSV file
77
- fn write_csv_file ( mut file : File ) {
78
- // Create the data to put into the csv file with headers
79
- let content = r#"id,time,vote,unixtime,rating
80
- a1,"10 6, 2013",3,1381017600,5.0
81
- a2,"08 9, 2013",2,1376006400,4.5"# ;
82
- // write the data
83
- file. write_all ( content. as_ref ( ) )
84
- . expect ( "Problem with writing file!" ) ;
85
- }
70
+ /// Use the DataFrame API to
71
+ /// 1. Read CSV files
72
+ /// 2. Optionally specify schema
73
+ async fn read_csv ( ctx : & SessionContext ) -> Result < ( ) > {
74
+ // create example.csv file in a temporary directory
75
+ let dir = tempdir ( ) ?;
76
+ let file_path = dir. path ( ) . join ( "example.csv" ) ;
77
+ {
78
+ let mut file = File :: create ( & file_path) ?;
79
+ // write CSV data
80
+ file. write_all (
81
+ r#"id,time,vote,unixtime,rating
82
+ a1,"10 6, 2013",3,1381017600,5.0
83
+ a2,"08 9, 2013",2,1376006400,4.5"#
84
+ . as_bytes ( ) ,
85
+ ) ?;
86
+ } // scope closes the file
87
+ let file_path = file_path. to_str ( ) . unwrap ( ) ;
86
88
87
- // Example to read data from a csv file with inferred schema
88
- async fn example_read_csv_file_with_inferred_schema ( file_path : & str ) -> DataFrame {
89
- // Create a session context
90
- let ctx = SessionContext :: new ( ) ;
91
- // Register a lazy DataFrame using the context
92
- ctx. read_csv ( file_path, CsvReadOptions :: default ( ) )
93
- . await
94
- . unwrap ( )
95
- }
89
+ // You can read a CSV file and DataFusion will infer the schema automatically
90
+ let csv_df = ctx. read_csv ( file_path, CsvReadOptions :: default ( ) ) . await ?;
91
+ csv_df. show ( ) . await ?;
96
92
97
- // Example to read csv file with a defined schema for the csv file
98
- async fn example_read_csv_file_with_schema ( file_path : & str ) -> DataFrame {
99
- // Create a session context
100
- let ctx = SessionContext :: new ( ) ;
101
- // Define the schema
93
+ // If you know the types of your data you can specify them explicitly
102
94
let schema = Schema :: new ( vec ! [
103
95
Field :: new( "id" , DataType :: Utf8 , false ) ,
104
96
Field :: new( "time" , DataType :: Utf8 , false ) ,
@@ -112,6 +104,38 @@ async fn example_read_csv_file_with_schema(file_path: &str) -> DataFrame {
112
104
schema : Some ( & schema) ,
113
105
..Default :: default ( )
114
106
} ;
115
- // Register a lazy DataFrame by using the context and option provider
116
- ctx. read_csv ( file_path, csv_read_option) . await . unwrap ( )
107
+ let csv_df = ctx. read_csv ( file_path, csv_read_option) . await ?;
108
+ csv_df. show ( ) . await ?;
109
+
110
+ // You can also create DataFrames from the result of sql queries
111
+ // and using the `enable_url_table` refer to local files directly
112
+ let dyn_ctx = ctx. clone ( ) . enable_url_table ( ) ;
113
+ let csv_df = dyn_ctx
114
+ . sql ( & format ! ( "SELECT rating, unixtime FROM '{}'" , file_path) )
115
+ . await ?;
116
+ csv_df. show ( ) . await ?;
117
+
118
+ Ok ( ( ) )
119
+ }
120
+
121
+ /// Use the DataFrame API to:
122
+ /// 1. Read in-memory data.
123
+ async fn read_memory ( ctx : & SessionContext ) -> Result < ( ) > {
124
+ // define data in memory
125
+ let a: ArrayRef = Arc :: new ( StringArray :: from ( vec ! [ "a" , "b" , "c" , "d" ] ) ) ;
126
+ let b: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [ 1 , 10 , 10 , 100 ] ) ) ;
127
+ let batch = RecordBatch :: try_from_iter ( vec ! [ ( "a" , a) , ( "b" , b) ] ) ?;
128
+
129
+ // declare a table in memory. In Apache Spark API, this corresponds to createDataFrame(...).
130
+ ctx. register_batch ( "t" , batch) ?;
131
+ let df = ctx. table ( "t" ) . await ?;
132
+
133
+ // construct an expression corresponding to "SELECT a, b FROM t WHERE b = 10" in SQL
134
+ let filter = col ( "b" ) . eq ( lit ( 10 ) ) ;
135
+ let df = df. select_columns ( & [ "a" , "b" ] ) ?. filter ( filter) ?;
136
+
137
+ // print the results
138
+ df. show ( ) . await ?;
139
+
140
+ Ok ( ( ) )
117
141
}
0 commit comments