Skip to content

Commit bb0325f

Browse files
committed
Merge branch 'main' into arrow-avro
2 parents d5d615f + 2b7d4f9 commit bb0325f

13 files changed

Lines changed: 766 additions & 196 deletions

File tree

datafusion-examples/README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,19 @@ cargo run --example dataframe -- dataframe
8888

8989
#### Category: Single Process
9090

91-
| Subcommand | File Path | Description |
92-
| -------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------ |
93-
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94-
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
95-
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
96-
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
97-
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
98-
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
99-
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
100-
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
101-
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
102-
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
91+
| Subcommand | File Path | Description |
92+
| ---------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------- |
93+
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94+
| in_memory_object_store | [`data_io/in_memory_object_store.rs`](examples/data_io/in_memory_object_store.rs) | Read CSV from an in-memory object store (pattern applies to JSON/Parquet) |
95+
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
96+
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
97+
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
98+
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
99+
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
100+
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
101+
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
102+
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
103+
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
103104

104105
## DataFrame Examples
105106

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! See `main.rs` for how to run it.
19+
//!
20+
//! This follows the recommended approach: implement the `ObjectStore` trait
21+
//! (or use an existing implementation), register it with DataFusion, and then
22+
//! read a URL "path" from that store.
23+
//! See the in-memory reference implementation:
24+
//! https://docs.rs/object_store/latest/object_store/memory/struct.InMemory.html
25+
26+
use std::sync::Arc;
27+
28+
use arrow::datatypes::{DataType, Field, Schema};
29+
use datafusion::assert_batches_eq;
30+
use datafusion::common::Result;
31+
use datafusion::execution::object_store::ObjectStoreUrl;
32+
use datafusion::prelude::{CsvReadOptions, SessionContext};
33+
use object_store::memory::InMemory;
34+
use object_store::path::Path;
35+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
36+
37+
/// Demonstrates reading CSV data from an in-memory object store.
38+
///
39+
/// The same pattern applies to JSON/Parquet: register a store for a URL
40+
/// prefix, write bytes into the store, then read via that URL.
41+
pub async fn in_memory_object_store() -> Result<()> {
42+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
43+
let ctx = SessionContext::new();
44+
let object_store_url = ObjectStoreUrl::parse("memory://")?;
45+
// Register a URL prefix to route reads through this object store.
46+
ctx.register_object_store(object_store_url.as_ref(), Arc::clone(&store));
47+
48+
let schema = Schema::new(vec![
49+
Field::new("id", DataType::Int64, false),
50+
Field::new("name", DataType::Utf8, false),
51+
]);
52+
53+
println!("=== CSV from memory ===");
54+
let csv_path = Path::from("/people.csv");
55+
let csv_data = b"id,name\n1,Alice\n2,Bob\n";
56+
// Write bytes into the in-memory object store.
57+
store
58+
.put(&csv_path, PutPayload::from_static(csv_data))
59+
.await?;
60+
// Read using the URL that matches the registered prefix.
61+
let csv = ctx
62+
.read_csv(
63+
"memory:///people.csv",
64+
CsvReadOptions::new().schema(&schema),
65+
)
66+
.await?
67+
.collect()
68+
.await?;
69+
#[rustfmt::skip]
70+
let expected = [
71+
"+----+-------+",
72+
"| id | name |",
73+
"+----+-------+",
74+
"| 1 | Alice |",
75+
"| 2 | Bob |",
76+
"+----+-------+",
77+
];
78+
assert_batches_eq!(expected, &csv);
79+
80+
Ok(())
81+
}

datafusion-examples/examples/data_io/main.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//!
2222
//! ## Usage
2323
//! ```bash
24-
//! cargo run --example data_io -- [all|catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
24+
//! cargo run --example data_io -- [all|catalog|in_memory_object_store|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
2525
//! ```
2626
//!
2727
//! Each subcommand runs a corresponding example:
@@ -30,6 +30,9 @@
3030
//! - `catalog`
3131
//! (file: catalog.rs, desc: Register tables into a custom catalog)
3232
//!
33+
//! - `in_memory_object_store`
34+
//! (file: in_memory_object_store.rs, desc: Read CSV from an in-memory object store (pattern applies to JSON/Parquet))
35+
//!
3336
//! - `json_shredding`
3437
//! (file: json_shredding.rs, desc: Implement filter rewriting for JSON shredding)
3538
//!
@@ -58,6 +61,7 @@
5861
//! (file: remote_catalog.rs, desc: Interact with a remote catalog)
5962
6063
mod catalog;
64+
mod in_memory_object_store;
6165
mod json_shredding;
6266
mod parquet_advanced_index;
6367
mod parquet_embedded_index;
@@ -77,6 +81,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames};
7781
enum ExampleKind {
7882
All,
7983
Catalog,
84+
InMemoryObjectStore,
8085
JsonShredding,
8186
ParquetAdvIdx,
8287
ParquetEmbIdx,
@@ -104,6 +109,9 @@ impl ExampleKind {
104109
}
105110
}
106111
ExampleKind::Catalog => catalog::catalog().await?,
112+
ExampleKind::InMemoryObjectStore => {
113+
in_memory_object_store::in_memory_object_store().await?
114+
}
107115
ExampleKind::JsonShredding => json_shredding::json_shredding().await?,
108116
ExampleKind::ParquetAdvIdx => {
109117
parquet_advanced_index::parquet_advanced_index().await?

0 commit comments

Comments
 (0)