Skip to content

Commit 28d50f5

Browse files
committed
feat: Add BallistaBuilder for custom object store configuration
- Introduced a new `BallistaBuilder` struct to facilitate the creation of Ballista session contexts with pre-configured object stores. - Updated dependencies in `Cargo.toml` to use specific versions for `datafusion` and `pyo3`. - Refactored `build.rs` to remove unused modules and streamline the build process. - Added an example demonstrating the usage of `BallistaBuilder` with a pre-created S3 object store. - Implemented methods in `BallistaBuilder` for setting session configurations, registering object stores, and creating contexts for remote and standalone execution.
1 parent 62c381a commit 28d50f5

9 files changed

Lines changed: 2623 additions & 837 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ballista/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ ballista-executor = { path = "../executor", version = "51.0.0", optional = true
3434
ballista-scheduler = { path = "../scheduler", version = "51.0.0", optional = true }
3535
datafusion = { workspace = true }
3636
log = { workspace = true }
37+
object_store = { workspace = true }
3738

3839
tokio = { workspace = true }
3940
tonic = { workspace = true }

ballista/client/src/builder.rs

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Builder API for creating Ballista session contexts with custom object stores.
19+
//!
20+
//! This module provides a fluent builder pattern for configuring Ballista sessions,
21+
//! including the ability to register pre-created object stores with custom authentication.
22+
//!
23+
//! # Example
24+
//!
25+
//! ```no_run
26+
//! use ballista::prelude::BallistaBuilder;
27+
//! use object_store::aws::AmazonS3Builder;
28+
//! use std::sync::Arc;
29+
//!
30+
//! # #[tokio::main]
31+
//! # async fn main() -> datafusion::error::Result<()> {
32+
//! // Create an object store with custom authentication
33+
//! let s3_store = AmazonS3Builder::new()
34+
//! .with_bucket_name("my-bucket")
35+
//! .with_region("us-east-1")
36+
//! // Custom authentication bridge
37+
//! .with_access_key_id("my-access-key")
38+
//! .with_secret_access_key("my-secret-key")
39+
//! .build()?;
40+
//!
41+
//! let ctx = BallistaBuilder::new()
42+
//! .with_object_store("s3://my-bucket", Arc::new(s3_store))
43+
//! .standalone()
44+
//! .await?;
45+
//! # Ok(())
46+
//! # }
47+
//! ```
48+
49+
use crate::extension::SessionContextExt;
50+
use ballista_core::extension::SessionConfigExt;
51+
use datafusion::error::{DataFusionError, Result};
52+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
53+
use datafusion::execution::SessionStateBuilder;
54+
use datafusion::prelude::{SessionConfig, SessionContext};
55+
use object_store::ObjectStore;
56+
use std::sync::Arc;
57+
use url::Url;
58+
59+
/// A builder for creating Ballista session contexts with custom configuration.
60+
///
61+
/// This builder provides a fluent API for configuring Ballista sessions,
62+
/// including support for registering pre-created object stores with custom
63+
/// authentication.
64+
///
65+
/// # Example
66+
///
67+
/// ```no_run
68+
/// use ballista::prelude::BallistaBuilder;
69+
/// use object_store::aws::AmazonS3Builder;
70+
/// use std::sync::Arc;
71+
///
72+
/// # #[tokio::main]
73+
/// # async fn main() -> datafusion::error::Result<()> {
74+
/// let s3_store = AmazonS3Builder::new()
75+
/// .with_bucket_name("my-bucket")
76+
/// .with_region("us-east-1")
77+
/// .with_access_key_id("my-access-key")
78+
/// .with_secret_access_key("my-secret-key")
79+
/// .build()?;
80+
///
81+
/// let ctx = BallistaBuilder::new()
82+
/// .with_object_store("s3://my-bucket", Arc::new(s3_store))
83+
/// .remote("df://localhost:50050")
84+
/// .await?;
85+
/// # Ok(())
86+
/// # }
87+
/// ```
88+
#[derive(Default, Clone)]
89+
pub struct BallistaBuilder {
90+
session_config: SessionConfig,
91+
object_stores: Vec<(Url, Arc<dyn ObjectStore>)>,
92+
}
93+
94+
impl BallistaBuilder {
95+
/// Creates a new [`BallistaBuilder`] with default configuration.
96+
pub fn new() -> Self {
97+
Self {
98+
session_config: SessionConfig::new_with_ballista(),
99+
object_stores: Vec::new(),
100+
}
101+
}
102+
103+
/// Sets the session configuration.
104+
///
105+
/// This replaces any previously set configuration.
106+
pub fn with_config(mut self, config: SessionConfig) -> Self {
107+
self.session_config = config;
108+
self
109+
}
110+
111+
/// Sets a configuration option by key-value pair.
112+
///
113+
/// # Example
114+
///
115+
/// ```no_run
116+
/// use ballista::prelude::BallistaBuilder;
117+
///
118+
/// # #[tokio::main]
119+
/// # async fn main() -> datafusion::error::Result<()> {
120+
/// let ctx = BallistaBuilder::new()
121+
/// .config("datafusion.execution.target_partitions", "8")?
122+
/// .standalone()
123+
/// .await?;
124+
/// # Ok(())
125+
/// # }
126+
/// ```
127+
pub fn config(mut self, key: &str, value: &str) -> Result<Self> {
128+
self.session_config
129+
.options_mut()
130+
.set(key, value)
131+
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
132+
Ok(self)
133+
}
134+
135+
/// Registers a pre-created object store for a given URL prefix.
136+
///
137+
/// This allows you to pass in object stores that have been configured
138+
/// with custom authentication (e.g., via an authentication bridge).
139+
///
140+
/// The URL should be the base URL for the object store, such as:
141+
/// - `s3://my-bucket` for S3
142+
/// - `az://my-container` for Azure Blob Storage
143+
/// - `gs://my-bucket` for Google Cloud Storage
144+
///
145+
/// # Example
146+
///
147+
/// ```no_run
148+
/// use ballista::prelude::BallistaBuilder;
149+
/// use object_store::aws::AmazonS3Builder;
150+
/// use std::sync::Arc;
151+
///
152+
/// # #[tokio::main]
153+
/// # async fn main() -> datafusion::error::Result<()> {
154+
/// let s3_store = AmazonS3Builder::new()
155+
/// .with_bucket_name("my-bucket")
156+
/// .with_region("us-east-1")
157+
/// .with_access_key_id("my-access-key")
158+
/// .with_secret_access_key("my-secret-key")
159+
/// .build()?;
160+
///
161+
/// let ctx = BallistaBuilder::new()
162+
/// .with_object_store("s3://my-bucket", Arc::new(s3_store))
163+
/// .standalone()
164+
/// .await?;
165+
/// # Ok(())
166+
/// # }
167+
/// ```
168+
pub fn with_object_store(mut self, url: &str, store: Arc<dyn ObjectStore>) -> Self {
169+
// Parse the URL, or store it for later error handling during build
170+
if let Ok(parsed_url) = Url::parse(url) {
171+
self.object_stores.push((parsed_url, store));
172+
} else {
173+
// We'll handle invalid URLs during build
174+
log::warn!("Invalid object store URL: {}", url);
175+
}
176+
self
177+
}
178+
179+
/// Registers a pre-created object store for a given URL.
180+
///
181+
/// This is the same as [`with_object_store`](Self::with_object_store) but takes
182+
/// a pre-parsed [`Url`] instead of a string.
183+
pub fn with_object_store_url(
184+
mut self,
185+
url: Url,
186+
store: Arc<dyn ObjectStore>,
187+
) -> Self {
188+
self.object_stores.push((url, store));
189+
self
190+
}
191+
192+
/// Sets the Ballista job name.
193+
///
194+
/// # Example
195+
///
196+
/// ```no_run
197+
/// use ballista::prelude::BallistaBuilder;
198+
///
199+
/// # #[tokio::main]
200+
/// # async fn main() -> datafusion::error::Result<()> {
201+
/// let ctx = BallistaBuilder::new()
202+
/// .with_job_name("My Analytics Job")
203+
/// .standalone()
204+
/// .await?;
205+
/// # Ok(())
206+
/// # }
207+
/// ```
208+
pub fn with_job_name(mut self, name: &str) -> Self {
209+
self.session_config = self.session_config.with_ballista_job_name(name);
210+
self
211+
}
212+
213+
/// Sets the target number of partitions for query execution.
214+
pub fn with_target_partitions(mut self, partitions: usize) -> Self {
215+
self.session_config = self.session_config.with_target_partitions(partitions);
216+
self
217+
}
218+
219+
/// Sets the batch size for query execution.
220+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
221+
self.session_config = self.session_config.with_batch_size(batch_size);
222+
self
223+
}
224+
225+
/// Enables or disables the information schema.
226+
pub fn with_information_schema(mut self, enabled: bool) -> Self {
227+
self.session_config = self.session_config.with_information_schema(enabled);
228+
self
229+
}
230+
231+
/// Builds a [`SessionState`] with the configured object stores.
232+
fn build_state(&self) -> Result<datafusion::execution::SessionState> {
233+
let runtime_env = RuntimeEnvBuilder::new().build()?;
234+
235+
// Register all object stores
236+
for (url, store) in &self.object_stores {
237+
runtime_env.register_object_store(url, Arc::clone(store));
238+
}
239+
240+
let state = SessionStateBuilder::new()
241+
.with_config(self.session_config.clone())
242+
.with_runtime_env(Arc::new(runtime_env))
243+
.with_default_features()
244+
.build();
245+
246+
Ok(state)
247+
}
248+
249+
/// Creates a context for executing queries against a remote Ballista scheduler.
250+
///
251+
/// # Arguments
252+
///
253+
/// * `url` - The URL of the Ballista scheduler (e.g., "df://localhost:50050")
254+
///
255+
/// # Example
256+
///
257+
/// ```no_run
258+
/// use ballista::prelude::BallistaBuilder;
259+
///
260+
/// # #[tokio::main]
261+
/// # async fn main() -> datafusion::error::Result<()> {
262+
/// let ctx = BallistaBuilder::new()
263+
/// .remote("df://localhost:50050")
264+
/// .await?;
265+
/// # Ok(())
266+
/// # }
267+
/// ```
268+
pub async fn remote(self, url: &str) -> Result<SessionContext> {
269+
let state = self.build_state()?;
270+
SessionContext::remote_with_state(url, state).await
271+
}
272+
273+
/// Creates a context for executing queries against a standalone Ballista cluster.
274+
///
275+
/// This starts a local Ballista cluster with a scheduler and executor in-process.
276+
///
277+
/// # Example
278+
///
279+
/// ```no_run
280+
/// use ballista::prelude::BallistaBuilder;
281+
///
282+
/// # #[tokio::main]
283+
/// # async fn main() -> datafusion::error::Result<()> {
284+
/// let ctx = BallistaBuilder::new()
285+
/// .standalone()
286+
/// .await?;
287+
/// # Ok(())
288+
/// # }
289+
/// ```
290+
#[cfg(feature = "standalone")]
291+
pub async fn standalone(self) -> Result<SessionContext> {
292+
let state = self.build_state()?;
293+
SessionContext::standalone_with_state(state).await
294+
}
295+
}

ballista/client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#![doc = include_str!("../README.md")]
1919
#![warn(missing_docs)]
2020

21+
/// Builder API for creating Ballista session contexts with custom object stores.
22+
pub mod builder;
2123
/// Extension traits for integrating DataFusion with Ballista distributed execution.
2224
pub mod extension;
2325
/// Prelude module providing commonly used imports for Ballista client applications.

ballista/client/src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@
2222
// error::{BallistaError, Result},
2323
// };
2424

25+
pub use crate::builder::BallistaBuilder;
2526
pub use crate::extension::{SessionConfigExt, SessionContextExt};
2627
//pub use futures::StreamExt;

0 commit comments

Comments
 (0)