|
| 1 | +use datafusion::catalog::TableProvider; |
| 2 | +use datafusion::datasource::listing::{ |
| 3 | + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, |
| 4 | +}; |
| 5 | +use datafusion::prelude::SessionContext; |
| 6 | +use datafusion_ffi::table_provider::FFI_TableProvider; |
| 7 | +use geodatafusion_flatgeobuf::FlatGeobufFormat; |
| 8 | +use pyo3::prelude::*; |
| 9 | +use pyo3::types::PyCapsule; |
| 10 | +use pyo3::{Bound, PyResult, Python, pyclass, pymethods}; |
| 11 | +use pyo3_async_runtimes::tokio::get_runtime; |
| 12 | +use std::sync::Arc; |
| 13 | + |
| 14 | +#[pyfunction] |
| 15 | +pub(crate) fn new_flatgeobuf(path: &str) -> PyFlatGeobufTableProvider { |
| 16 | + let format = Arc::new(FlatGeobufFormat::default()); |
| 17 | + |
| 18 | + let options = ListingOptions::new(format).with_file_extension(".fgb"); |
| 19 | + |
| 20 | + let table_path = ListingTableUrl::parse(path).unwrap(); |
| 21 | + |
| 22 | + let state = SessionContext::new().state(); |
| 23 | + let runtime = get_runtime(); |
| 24 | + let inferred_schema = |
| 25 | + runtime.block_on(async { options.infer_schema(&state, &table_path).await.unwrap() }); |
| 26 | + |
| 27 | + let config = ListingTableConfig::new(table_path) |
| 28 | + .with_listing_options(options) |
| 29 | + .with_schema(inferred_schema); |
| 30 | + |
| 31 | + let table = ListingTable::try_new(config).unwrap(); |
| 32 | + PyFlatGeobufTableProvider(Arc::new(table)) |
| 33 | +} |
| 34 | + |
| 35 | +#[pyclass(module = "geodatafusion", name = "FlatGeobufTableProvider", frozen)] |
| 36 | +pub(crate) struct PyFlatGeobufTableProvider(Arc<dyn TableProvider + Send>); |
| 37 | + |
| 38 | +#[pymethods] |
| 39 | +impl PyFlatGeobufTableProvider { |
| 40 | + pub fn __datafusion_table_provider__<'py>( |
| 41 | + &self, |
| 42 | + py: Python<'py>, |
| 43 | + ) -> PyResult<Bound<'py, PyCapsule>> { |
| 44 | + let name = cr"datafusion_table_provider".into(); |
| 45 | + |
| 46 | + let provider = FFI_TableProvider::new(self.0.clone(), false, None); |
| 47 | + |
| 48 | + PyCapsule::new(py, provider, Some(name)) |
| 49 | + } |
| 50 | +} |
0 commit comments