Skip to content

Commit d043f63

Browse files
committed
Deprecate RuntimeConfig, update code to use new builder style
1 parent dc4ae20 commit d043f63

File tree

11 files changed

+81
-77
lines changed

11 files changed

+81
-77
lines changed

benchmarks/src/bin/external_aggr.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider};
3333
use datafusion::error::Result;
3434
use datafusion::execution::memory_pool::FairSpillPool;
3535
use datafusion::execution::memory_pool::{human_readable_size, units};
36-
use datafusion::execution::runtime_env::RuntimeConfig;
36+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
37+
use datafusion::execution::SessionStateBuilder;
3738
use datafusion::physical_plan::display::DisplayableExecutionPlan;
3839
use datafusion::physical_plan::{collect, displayable};
3940
use datafusion::prelude::*;
@@ -195,10 +196,15 @@ impl ExternalAggrConfig {
195196
let query_name =
196197
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
197198
let config = self.common.config();
198-
let runtime_config = RuntimeConfig::new()
199+
let runtime_env = RuntimeEnvBuilder::new()
199200
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
200201
.build_arc()?;
201-
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
202+
let state = SessionStateBuilder::new()
203+
.with_config(config)
204+
.with_runtime_env(runtime_env)
205+
.with_default_features()
206+
.build();
207+
let ctx = SessionContext::from(state);
202208

203209
// register tables
204210
self.register_tables(&ctx).await?;

benchmarks/src/sort_tpch.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
3232
};
3333
use datafusion::datasource::{MemTable, TableProvider};
3434
use datafusion::error::Result;
35-
use datafusion::execution::runtime_env::RuntimeConfig;
35+
use datafusion::execution::SessionStateBuilder;
3636
use datafusion::physical_plan::display::DisplayableExecutionPlan;
3737
use datafusion::physical_plan::{displayable, execute_stream};
3838
use datafusion::prelude::*;
@@ -188,9 +188,11 @@ impl RunOpt {
188188
/// Benchmark query `query_id` in `SORT_QUERIES`
189189
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
190190
let config = self.common.config();
191-
192-
let runtime_config = RuntimeConfig::new().build_arc()?;
193-
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
191+
let state = SessionStateBuilder::new()
192+
.with_config(config)
193+
.with_default_features()
194+
.build();
195+
let ctx = SessionContext::from(state);
194196

195197
// register tables
196198
self.register_tables(&ctx).await?;

datafusion-cli/src/main.rs

+14-23
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock};
2323

2424
use datafusion::error::{DataFusionError, Result};
2525
use datafusion::execution::context::SessionConfig;
26-
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
27-
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
26+
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
27+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
2828
use datafusion::prelude::SessionContext;
2929
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3030
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -156,27 +156,22 @@ async fn main_inner() -> Result<()> {
156156
session_config = session_config.with_batch_size(batch_size);
157157
};
158158

159-
let rt_config = RuntimeConfig::new();
160-
let rt_config =
161-
// set memory pool size
162-
if let Some(memory_limit) = args.memory_limit {
163-
// set memory pool type
164-
match args.mem_pool_type {
165-
PoolType::Fair => rt_config
166-
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
167-
PoolType::Greedy => rt_config
168-
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
169-
}
170-
} else {
171-
rt_config
159+
let mut rt_builder = RuntimeEnvBuilder::new();
160+
// set memory pool size
161+
if let Some(memory_limit) = args.memory_limit {
162+
// set memory pool type
163+
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
164+
PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
165+
PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
172166
};
167+
rt_builder = rt_builder.with_memory_pool(pool)
168+
}
173169

174-
let runtime_env = create_runtime_env(rt_config.clone())?;
170+
let runtime_env = rt_builder.build_arc()?;
175171

176172
// enable dynamic file query
177-
let ctx =
178-
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
179-
.enable_url_table();
173+
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env)
174+
.enable_url_table();
180175
ctx.refresh_catalogs().await?;
181176
// install dynamic catalog provider that can register required object stores
182177
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
@@ -231,10 +226,6 @@ async fn main_inner() -> Result<()> {
231226
Ok(())
232227
}
233228

234-
fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
235-
RuntimeEnv::try_new(rn_config)
236-
}
237-
238229
fn parse_valid_file(dir: &str) -> Result<String, String> {
239230
if Path::new(dir).is_file() {
240231
Ok(dir.to_string())

datafusion/core/src/datasource/file_format/csv.rs

-3
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,6 @@ mod tests {
753753
use datafusion_common::cast::as_string_array;
754754
use datafusion_common::internal_err;
755755
use datafusion_common::stats::Precision;
756-
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
757756
use datafusion_expr::{col, lit};
758757

759758
use chrono::DateTime;
@@ -984,12 +983,10 @@ mod tests {
984983
async fn query_compress_data(
985984
file_compression_type: FileCompressionType,
986985
) -> Result<()> {
987-
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
988986
let mut cfg = SessionConfig::new();
989987
cfg.options_mut().catalog.has_header = true;
990988
let session_state = SessionStateBuilder::new()
991989
.with_config(cfg)
992-
.with_runtime_env(runtime)
993990
.with_default_features()
994991
.build();
995992
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

datafusion/core/src/execution/context/mod.rs

-3
Original file line numberDiff line numberDiff line change
@@ -1792,7 +1792,6 @@ mod tests {
17921792
use super::{super::options::CsvReadOptions, *};
17931793
use crate::assert_batches_eq;
17941794
use crate::execution::memory_pool::MemoryConsumer;
1795-
use crate::execution::runtime_env::RuntimeEnvBuilder;
17961795
use crate::test;
17971796
use crate::test_util::{plan_and_collect, populate_csv_partitions};
17981797
use arrow_schema::{DataType, TimeUnit};
@@ -1932,14 +1931,12 @@ mod tests {
19321931
let path = path.join("tests/tpch-csv");
19331932
let url = format!("file://{}", path.display());
19341933

1935-
let runtime = RuntimeEnvBuilder::new().build_arc()?;
19361934
let cfg = SessionConfig::new()
19371935
.set_str("datafusion.catalog.location", url.as_str())
19381936
.set_str("datafusion.catalog.format", "CSV")
19391937
.set_str("datafusion.catalog.has_header", "true");
19401938
let session_state = SessionStateBuilder::new()
19411939
.with_config(cfg)
1942-
.with_runtime_env(runtime)
19431940
.with_default_features()
19441941
.build();
19451942
let ctx = SessionContext::new_with_state(session_state);

datafusion/execution/src/disk_manager.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Manages files generated during query execution, files are
19-
//! hashed among the directories listed in RuntimeConfig::local_dirs.
18+
//! [`DiskManager`]: Manages files generated during query execution
2019
2120
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
2221
use log::debug;

datafusion/execution/src/memory_pool/pool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub struct GreedyMemoryPool {
6262
}
6363

6464
impl GreedyMemoryPool {
65-
/// Allocate up to `limit` bytes
65+
/// Create a new pool that can allocate up to `pool_size` bytes
6666
pub fn new(pool_size: usize) -> Self {
6767
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
6868
Self {

datafusion/execution/src/runtime_env.rs

+44-28
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,32 @@ use url::Url;
4141
/// Execution runtime environment that manages system resources such
4242
/// as memory, disk, cache and storage.
4343
///
44-
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
44+
/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
4545
/// following resource management functionality:
4646
///
4747
/// * [`MemoryPool`]: Manage memory
4848
/// * [`DiskManager`]: Manage temporary files on local disk
4949
/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
5050
/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
51+
///
52+
/// # Example: Create default `RuntimeEnv`
53+
/// ```
54+
/// # use datafusion_execution::runtime_env::RuntimeEnv;
55+
/// let runtime_env = RuntimeEnv::default();
56+
/// ```
57+
///
58+
/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
59+
/// ```
60+
/// # use std::sync::Arc;
61+
/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
62+
/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
63+
/// // restrict to using at most 100MB of memory
64+
/// let pool_size = 100 * 1024 * 1024;
65+
/// let runtime_env = RuntimeEnvBuilder::new()
66+
/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
67+
/// .build()
68+
/// .unwrap();
69+
/// ```
5170
pub struct RuntimeEnv {
5271
/// Runtime memory management
5372
pub memory_pool: Arc<dyn MemoryPool>,
@@ -66,28 +85,16 @@ impl Debug for RuntimeEnv {
6685
}
6786

6887
impl RuntimeEnv {
69-
#[deprecated(since = "43.0.0", note = "please use `try_new` instead")]
88+
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
89+
#[allow(deprecated)]
7090
pub fn new(config: RuntimeConfig) -> Result<Self> {
7191
Self::try_new(config)
7292
}
7393
/// Create env based on configuration
94+
#[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
95+
#[allow(deprecated)]
7496
pub fn try_new(config: RuntimeConfig) -> Result<Self> {
75-
let RuntimeConfig {
76-
memory_pool,
77-
disk_manager,
78-
cache_manager,
79-
object_store_registry,
80-
} = config;
81-
82-
let memory_pool =
83-
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
84-
85-
Ok(Self {
86-
memory_pool,
87-
disk_manager: DiskManager::try_new(disk_manager)?,
88-
cache_manager: CacheManager::try_new(&cache_manager)?,
89-
object_store_registry,
90-
})
97+
config.build()
9198
}
9299

93100
/// Registers a custom `ObjectStore` to be used with a specific url.
@@ -104,7 +111,7 @@ impl RuntimeEnv {
104111
/// # use std::sync::Arc;
105112
/// # use url::Url;
106113
/// # use datafusion_execution::runtime_env::RuntimeEnv;
107-
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
114+
/// # let runtime_env = RuntimeEnv::default();
108115
/// let url = Url::try_from("file://").unwrap();
109116
/// let object_store = object_store::local::LocalFileSystem::new();
110117
/// // register the object store with the runtime environment
@@ -119,11 +126,12 @@ impl RuntimeEnv {
119126
/// # use std::sync::Arc;
120127
/// # use url::Url;
121128
/// # use datafusion_execution::runtime_env::RuntimeEnv;
122-
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
129+
/// # let runtime_env = RuntimeEnv::default();
123130
/// # // use local store for example as http feature is not enabled
124131
/// # let http_store = object_store::local::LocalFileSystem::new();
125132
/// // create a new object store via object_store::http::HttpBuilder;
126133
/// let base_url = Url::parse("https://github.com").unwrap();
134+
/// // (note this example can't depend on the http feature)
127135
/// // let http_store = HttpBuilder::new()
128136
/// // .with_url(base_url.clone())
129137
/// // .build()
@@ -155,12 +163,15 @@ impl Default for RuntimeEnv {
155163
}
156164
}
157165

158-
/// Please see: <https://github.com/apache/datafusion/issues/12156>
166+
/// Please see: <https://github.com/apache/datafusion/issues/12156a>
159167
/// This a type alias for backwards compatibility.
168+
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
160169
pub type RuntimeConfig = RuntimeEnvBuilder;
161170

162171
#[derive(Clone)]
163-
/// Execution runtime configuration
172+
/// Execution runtime configuration builder.
173+
///
174+
/// See example on [`RuntimeEnv`]
164175
pub struct RuntimeEnvBuilder {
165176
/// DiskManager to manage temporary disk file usage
166177
pub disk_manager: DiskManagerConfig,
@@ -239,15 +250,20 @@ impl RuntimeEnvBuilder {
239250

240251
/// Build a RuntimeEnv
241252
pub fn build(self) -> Result<RuntimeEnv> {
242-
let memory_pool = self
243-
.memory_pool
244-
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
253+
let Self {
254+
disk_manager,
255+
memory_pool,
256+
cache_manager,
257+
object_store_registry,
258+
} = self;
259+
let memory_pool =
260+
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
245261

246262
Ok(RuntimeEnv {
247263
memory_pool,
248-
disk_manager: DiskManager::try_new(self.disk_manager)?,
249-
cache_manager: CacheManager::try_new(&self.cache_manager)?,
250-
object_store_registry: self.object_store_registry,
264+
disk_manager: DiskManager::try_new(disk_manager)?,
265+
cache_manager: CacheManager::try_new(&cache_manager)?,
266+
object_store_registry,
251267
})
252268
}
253269

datafusion/execution/src/task.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
// under the License.
1717

1818
use crate::{
19-
config::SessionConfig,
20-
memory_pool::MemoryPool,
21-
registry::FunctionRegistry,
22-
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
19+
config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
20+
runtime_env::RuntimeEnv,
2321
};
2422
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
2523
use datafusion_expr::planner::ExprPlanner;
@@ -54,9 +52,7 @@ pub struct TaskContext {
5452

5553
impl Default for TaskContext {
5654
fn default() -> Self {
57-
let runtime = RuntimeEnvBuilder::new()
58-
.build_arc()
59-
.expect("default runtime created successfully");
55+
let runtime = Arc::new(RuntimeEnv::default());
6056

6157
// Create a default task context, mostly useful for testing
6258
Self {

datafusion/physical-plan/src/aggregates/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1429,7 +1429,7 @@ mod tests {
14291429

14301430
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
14311431
let session_config = SessionConfig::new().with_batch_size(batch_size);
1432-
let runtime = RuntimeEnvBuilder::default()
1432+
let runtime = RuntimeEnvBuilder::new()
14331433
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
14341434
.build_arc()
14351435
.unwrap();
@@ -1914,7 +1914,7 @@ mod tests {
19141914
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
19151915
let input_schema = input.schema();
19161916

1917-
let runtime = RuntimeEnvBuilder::default()
1917+
let runtime = RuntimeEnvBuilder::new()
19181918
.with_memory_limit(1, 1.0)
19191919
.build_arc()?;
19201920
let task_ctx = TaskContext::default().with_runtime(runtime);

datafusion/sql/src/planner.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
468468
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
469469
{
470470
// Timestamp With Time Zone
471-
// INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone
471+
// INPUT : [SQLDataType] TimestampTz + [Config] Time Zone
472472
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
473473
self.context_provider.options().execution.time_zone.clone()
474474
} else {

0 commit comments

Comments
 (0)