Skip to content

Commit 6391aab

Browse files
committed
feat: datafusion_temporary_file()
1 parent 9071503 commit 6391aab

File tree

7 files changed

+153
-2
lines changed

7 files changed

+153
-2
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/execution/session_state.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,6 @@ impl SessionStateBuilder {
10361036
.into_iter()
10371037
.map(|f| (f.name().to_string(), f)),
10381038
);
1039-
10401039
self
10411040
}
10421041

datafusion/execution/src/disk_manager.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,42 @@ impl DiskManager {
144144
.map_err(DataFusionError::IoError)?,
145145
})
146146
}
147+
148+
pub fn get_temporary_files(&self) -> Vec<(String, u64)> {
149+
let mut files = Vec::new();
150+
151+
let guard = self.local_dirs.lock();
152+
if let Some(local_dirs) = guard.as_ref() {
153+
local_dirs
154+
.iter()
155+
.flat_map(|temp_dir| {
156+
std::fs::read_dir(temp_dir.path())
157+
.into_iter()
158+
.flat_map(|entries| {
159+
entries.flatten().filter_map(|entry| {
160+
let path = entry.path();
161+
entry
162+
.metadata()
163+
.ok()
164+
.filter(|metadata| metadata.is_file())
165+
.map(|metadata| {
166+
(
167+
path.to_string_lossy().to_string(),
168+
metadata.len(),
169+
)
170+
})
171+
})
172+
})
173+
})
174+
.for_each(|file| files.push(file));
175+
}
176+
177+
// test data
178+
files.push(("/tmp/file1.tmp".to_string(), 1024));
179+
files.push(("/tmp/file2.tmp".to_string(), 2048));
180+
181+
files
182+
}
147183
}
148184

149185
/// A wrapper around a [`NamedTempFile`] that also contains

datafusion/functions-table/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ datafusion-catalog = { workspace = true }
4444
datafusion-common = { workspace = true }
4545
datafusion-expr = { workspace = true }
4646
datafusion-physical-plan = { workspace = true }
47+
datafusion-execution ={ workspace = true }
48+
datafusion-datasource = { workspace = true }
4749
parking_lot = { workspace = true }
4850
paste = "1.0.14"
4951

datafusion/functions-table/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@
2525
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
2626

2727
pub mod generate_series;
28+
pub mod memory_profile;
2829

2930
use datafusion_catalog::TableFunction;
3031
use std::sync::Arc;
3132

3233
/// Returns all default table functions
3334
pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
34-
vec![generate_series(), range()]
35+
vec![generate_series(), range(), datafusion_temporary_files()]
3536
}
3637

3738
/// Creates a singleton instance of a table function
@@ -59,3 +60,7 @@ macro_rules! create_udtf_function {
5960

6061
create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series");
6162
create_udtf_function!(generate_series::RangeFunc, "range");
63+
create_udtf_function!(
64+
memory_profile::TemporaryFilesFunc,
65+
"datafusion_temporary_files"
66+
);
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{StringArray, UInt64Array};
19+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20+
use arrow::record_batch::RecordBatch;
21+
use async_trait::async_trait;
22+
use datafusion_catalog::{Session, TableFunctionImpl, TableProvider};
23+
use datafusion_common::Result;
24+
use datafusion_datasource::memory::MemorySourceConfig;
25+
use datafusion_expr::{Expr, TableType};
26+
use datafusion_physical_plan::ExecutionPlan;
27+
use std::sync::Arc;
28+
29+
#[derive(Debug, Clone)]
30+
struct TemporaryFilesTable {
31+
schema: SchemaRef,
32+
}
33+
34+
#[async_trait]
35+
impl TableProvider for TemporaryFilesTable {
36+
fn as_any(&self) -> &dyn std::any::Any {
37+
self
38+
}
39+
40+
fn schema(&self) -> SchemaRef {
41+
Arc::clone(&self.schema)
42+
}
43+
44+
fn table_type(&self) -> TableType {
45+
TableType::Base
46+
}
47+
48+
async fn scan(
49+
&self,
50+
state: &dyn Session,
51+
projection: Option<&Vec<usize>>,
52+
_filters: &[Expr],
53+
_limit: Option<usize>,
54+
) -> Result<Arc<dyn ExecutionPlan>> {
55+
let runtime_env = state.runtime_env();
56+
let temporary_files = runtime_env.disk_manager.get_temporary_files();
57+
let paths = temporary_files
58+
.iter()
59+
.map(|file| file.0.clone())
60+
.collect::<Vec<_>>();
61+
let sizes = temporary_files
62+
.iter()
63+
.map(|file| file.1)
64+
.collect::<Vec<_>>();
65+
let batches = vec![RecordBatch::try_new(
66+
Arc::clone(&self.schema),
67+
vec![
68+
Arc::new(StringArray::from(paths)),
69+
Arc::new(UInt64Array::from(sizes)),
70+
],
71+
)?];
72+
73+
Ok(MemorySourceConfig::try_new_exec(
74+
&[batches],
75+
TableProvider::schema(self),
76+
projection.cloned(),
77+
)?)
78+
}
79+
}
80+
81+
/// A table function that returns temporary files with their paths and sizes
82+
#[derive(Debug)]
83+
pub struct TemporaryFilesFunc {}
84+
85+
impl TableFunctionImpl for TemporaryFilesFunc {
86+
fn call(&self, _exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
87+
// Create the schema for the table
88+
let schema = Arc::new(Schema::new(vec![
89+
Field::new("path", DataType::Utf8, false),
90+
Field::new("size", DataType::UInt64, false),
91+
]));
92+
93+
// Create a MemTable plan that returns the RecordBatch
94+
let provider = TemporaryFilesTable {
95+
schema,
96+
};
97+
98+
Ok(Arc::new(provider))
99+
}
100+
}

datafusion/sqllogictest/test_files/table_functions.slt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,10 @@ SELECT range(1, t1.end) FROM range(3, 5) as t1(end)
303303
----
304304
[1, 2, 3]
305305
[1, 2]
306+
307+
# test
308+
query TI
309+
SELECT * FROM datafusion_temporary_files()
310+
----
311+
/tmp/file1.tmp 1024
312+
/tmp/file2.tmp 2048

0 commit comments

Comments
 (0)