-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathflight-sql.rs
74 lines (70 loc) · 2.45 KB
/
flight-sql.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::prelude::SessionContext;
use datafusion_table_providers::flight::sql::{FlightSqlDriver, QUERY};
use datafusion_table_providers::flight::FlightTableFactory;
use std::collections::HashMap;
use std::sync::Arc;
/// Prerequisites:
/// ```
/// $ brew install roapi
/// $ roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
/// ```
#[tokio::main]
async fn main() -> datafusion::common::Result<()> {
let ctx = SessionContext::new();
let flight_sql = FlightTableFactory::new(Arc::new(FlightSqlDriver::new()));
let table = flight_sql
.open_table(
"http://localhost:32010",
HashMap::from([(QUERY.into(), "SELECT * FROM taxi".into())]),
)
.await?;
ctx.register_table("trip_data", Arc::new(table))?;
ctx.sql("select * from trip_data limit 10")
.await?
.show()
.await?;
// The same table created using DDL
ctx.state_ref()
.write()
.table_factories_mut()
.insert("FLIGHT_SQL".into(), Arc::new(flight_sql));
let _ = ctx
.sql(
r#"
CREATE EXTERNAL TABLE trip_data2 STORED AS FLIGHT_SQL
LOCATION 'http://localhost:32010'
OPTIONS (
'flight.sql.query' 'SELECT * FROM taxi'
)
"#,
)
.await?;
let df = ctx
.sql(
r#"
SELECT "VendorID", COUNT(*), SUM(passenger_count), SUM(total_amount)
FROM trip_data2
GROUP BY "VendorID"
ORDER BY COUNT(*) DESC
"#,
)
.await?;
df.clone().explain(false, false)?.show().await?;
df.show().await
}