Skip to content

Commit 5c97dc5

Browse files
committed
feat: information.temporary_files
1 parent c2c43ec commit 5c97dc5

File tree

3 files changed

+150
-1
lines changed

3 files changed

+150
-1
lines changed

datafusion/catalog/src/information_schema.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use async_trait::async_trait;
3131
use datafusion_common::config::{ConfigEntry, ConfigOptions};
3232
use datafusion_common::error::Result;
3333
use datafusion_common::DataFusionError;
34-
use datafusion_execution::TaskContext;
34+
use datafusion_execution::{disk_manager, TaskContext};
3535
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
3636
use datafusion_expr::{TableType, Volatility};
3737
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -49,6 +49,7 @@ pub(crate) const DF_SETTINGS: &str = "df_settings";
4949
pub(crate) const SCHEMATA: &str = "schemata";
5050
pub(crate) const ROUTINES: &str = "routines";
5151
pub(crate) const PARAMETERS: &str = "parameters";
52+
pub(crate) const TEMPORARY_FILES: &str = "temporary_files";
5253

5354
/// All information schema tables
5455
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
@@ -59,6 +60,7 @@ pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
5960
SCHEMATA,
6061
ROUTINES,
6162
PARAMETERS,
63+
TEMPORARY_FILES,
6264
];
6365

6466
/// Implements the `information_schema` virtual schema and tables
@@ -397,6 +399,19 @@ impl InformationSchemaConfig {
397399
TypeSignature::Variadic(_) | TypeSignature::VariadicAny
398400
)
399401
}
402+
403+
fn make_temporary_files(
404+
&self,
405+
disk_manager: &disk_manager::DiskManager,
406+
builder: &mut InformationSchemaTemporaryFilesBuilder,
407+
) -> Result<(), DataFusionError> {
408+
let temporary_files = disk_manager.get_temporary_files();
409+
410+
for (path, size) in temporary_files {
411+
builder.add_file(&path, size);
412+
}
413+
Ok(())
414+
}
400415
}
401416

402417
/// get the arguments and return types of a UDF
@@ -496,6 +511,7 @@ impl SchemaProvider for InformationSchemaProvider {
496511
SCHEMATA => Arc::new(InformationSchemata::new(config)),
497512
ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
498513
PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
514+
TEMPORARY_FILES => Arc::new(InformationSchemaTemporaryFiles::new(config)),
499515
_ => return Ok(None),
500516
};
501517

@@ -1348,3 +1364,70 @@ impl PartitionStream for InformationSchemaParameters {
13481364
))
13491365
}
13501366
}
1367+
1368+
#[derive(Debug)]
1369+
struct InformationSchemaTemporaryFiles {
1370+
schema: SchemaRef,
1371+
config: InformationSchemaConfig,
1372+
}
1373+
1374+
impl InformationSchemaTemporaryFiles {
1375+
fn new(config: InformationSchemaConfig) -> Self {
1376+
let schema = Arc::new(Schema::new(vec![
1377+
Field::new("path", DataType::Utf8, false),
1378+
Field::new("size", DataType::UInt64, false),
1379+
]));
1380+
1381+
Self { schema, config }
1382+
}
1383+
1384+
fn builder(&self) -> InformationSchemaTemporaryFilesBuilder {
1385+
InformationSchemaTemporaryFilesBuilder {
1386+
paths: StringBuilder::new(),
1387+
sizes: UInt64Builder::new(),
1388+
schema: Arc::clone(&self.schema),
1389+
}
1390+
}
1391+
}
1392+
1393+
impl PartitionStream for InformationSchemaTemporaryFiles {
1394+
fn schema(&self) -> &SchemaRef {
1395+
&self.schema
1396+
}
1397+
1398+
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1399+
let mut builder = self.builder();
1400+
let config = self.config.clone();
1401+
Box::pin(RecordBatchStreamAdapter::new(
1402+
Arc::clone(&self.schema),
1403+
futures::stream::once(async move {
1404+
config.make_temporary_files(&ctx.runtime_env().disk_manager, &mut builder)?;
1405+
Ok(builder.finish())
1406+
}),
1407+
))
1408+
}
1409+
}
1410+
1411+
struct InformationSchemaTemporaryFilesBuilder {
1412+
schema: SchemaRef,
1413+
paths: StringBuilder,
1414+
sizes: UInt64Builder,
1415+
}
1416+
1417+
impl InformationSchemaTemporaryFilesBuilder {
1418+
fn add_file(&mut self, path: &str, size: u64) {
1419+
self.paths.append_value(path);
1420+
self.sizes.append_value(size);
1421+
}
1422+
1423+
fn finish(&mut self) -> RecordBatch {
1424+
RecordBatch::try_new(
1425+
Arc::clone(&self.schema),
1426+
vec![
1427+
Arc::new(self.paths.finish()),
1428+
Arc::new(self.sizes.finish()),
1429+
],
1430+
)
1431+
.unwrap()
1432+
}
1433+
}

datafusion/execution/src/disk_manager.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,31 @@ impl DiskManager {
144144
.map_err(DataFusionError::IoError)?,
145145
})
146146
}
147+
148+
pub fn get_temporary_files(&self) -> Vec<(String, u64)> {
149+
let mut files = Vec::new();
150+
151+
let guard = self.local_dirs.lock();
152+
if let Some(local_dirs) = guard.as_ref() {
153+
local_dirs.iter().flat_map(|temp_dir| {
154+
std::fs::read_dir(temp_dir.path()).into_iter().flat_map(|entries| {
155+
entries.flatten().filter_map(|entry| {
156+
let path = entry.path();
157+
entry.metadata().ok().filter(|metadata| metadata.is_file()).map(|metadata| {
158+
(path.to_string_lossy().to_string(), metadata.len())
159+
})
160+
})
161+
})
162+
}).for_each(|file| files.push(file));
163+
}
164+
165+
// test data
166+
files.push(("/tmp/file1.tmp".to_string(), 1024));
167+
files.push(("/tmp/file2.tmp".to_string(), 2048));
168+
169+
files
170+
}
171+
147172
}
148173

149174
/// A wrapper around a [`NamedTempFile`] that also contains
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
# Verify the information schema does not exit by default
20+
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.tables' not found
21+
SELECT * from information_schema.tables
22+
23+
# Verify the information schema does not exit by default
24+
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.schemata' not found
25+
SELECT * from information_schema.schemata
26+
27+
statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled
28+
show all
29+
30+
# Turn it on
31+
32+
# expect that the queries now work
33+
statement ok
34+
set datafusion.catalog.information_schema = true;
35+
36+
# Check information_schema.temporary_files
37+
query TI
38+
SELECT * FROM information_schema.temporary_files;
39+
----
40+
/tmp/file1.tmp 1024
41+
/tmp/file2.tmp 2048

0 commit comments

Comments
 (0)