Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions crates/core/src/delta_datafusion/engine/file_formats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::sync::Arc;

use dashmap::{mapref::one::Ref, DashMap};
use datafusion::execution::{
object_store::{ObjectStoreRegistry, ObjectStoreUrl},
TaskContext,
};
use delta_kernel::engine::parse_json as arrow_parse_json;
use delta_kernel::{
engine::default::{
executor::tokio::{TokioBackgroundExecutor, TokioMultiThreadExecutor},
json::DefaultJsonHandler,
parquet::DefaultParquetHandler,
},
error::DeltaResult as KernelResult,
schema::SchemaRef,
EngineData, FileDataReadResultIterator, FileMeta, JsonHandler, ParquetHandler, PredicateRef,
};
use itertools::Itertools;
use tokio::runtime::{Handle, RuntimeFlavor};

use super::storage::{group_by_store, AsObjectStoreUrl};

#[derive(Clone)]
pub struct DataFusionFileFormatHandler {
ctx: Arc<TaskContext>,
pq_registry: Arc<DashMap<ObjectStoreUrl, Arc<dyn ParquetHandler>>>,
json_registry: Arc<DashMap<ObjectStoreUrl, Arc<dyn JsonHandler>>>,
handle: Handle,
}

impl DataFusionFileFormatHandler {
/// Create a new [`DatafusionParquetHandler`] instance.
pub fn new(ctx: Arc<TaskContext>, handle: Handle) -> Self {
Self {
ctx,
pq_registry: DashMap::new().into(),
json_registry: DashMap::new().into(),
handle,
}
}

fn registry(&self) -> Arc<dyn ObjectStoreRegistry> {
self.ctx.runtime_env().object_store_registry.clone()
}

fn get_or_create_pq(
&self,
url: ObjectStoreUrl,
) -> KernelResult<Ref<'_, ObjectStoreUrl, Arc<dyn ParquetHandler>>> {
if let Some(handler) = self.pq_registry.get(&url) {
return Ok(handler);
}
let store = self
.registry()
.get_store(url.as_ref())
.map_err(delta_kernel::Error::generic_err)?;

let handler: Arc<dyn ParquetHandler> = match self.handle.runtime_flavor() {
RuntimeFlavor::MultiThread => Arc::new(DefaultParquetHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(self.handle.clone())),
)),
RuntimeFlavor::CurrentThread => Arc::new(DefaultParquetHandler::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
)),
_ => panic!("unsupported runtime flavor"),
};

self.pq_registry.insert(url.clone(), handler);
Ok(self.pq_registry.get(&url).unwrap())
}

fn get_or_create_json(
&self,
url: ObjectStoreUrl,
) -> KernelResult<Ref<'_, ObjectStoreUrl, Arc<dyn JsonHandler>>> {
if let Some(handler) = self.json_registry.get(&url) {
return Ok(handler);
}
let store = self
.registry()
.get_store(url.as_ref())
.map_err(delta_kernel::Error::generic_err)?;

let handler: Arc<dyn JsonHandler> = match self.handle.runtime_flavor() {
RuntimeFlavor::MultiThread => Arc::new(DefaultJsonHandler::new(
store,
Arc::new(TokioMultiThreadExecutor::new(self.handle.clone())),
)),
RuntimeFlavor::CurrentThread => Arc::new(DefaultJsonHandler::new(
store,
Arc::new(TokioBackgroundExecutor::new()),
)),
_ => panic!("unsupported runtime flavor"),
};
Comment on lines +87 to +97
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We replicate this logic inside functions to avoid the generic parameter spilling out into our implementations. This tends to bubble up quite quickly since TaskExecutor is not dyn.

Once we move to datafusion specific implementations, these will not be generic over the runtime since datafusion and we are tied to tokio.


self.json_registry.insert(url.clone(), handler);
Ok(self.json_registry.get(&url).unwrap())
}
}

impl ParquetHandler for DataFusionFileFormatHandler {
fn read_parquet_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> KernelResult<FileDataReadResultIterator> {
let grouped_files = group_by_store(files.to_vec());
Ok(Box::new(
grouped_files
.into_iter()
.map(|(url, files)| {
self.get_or_create_pq(url)?.read_parquet_files(
&files.to_vec(),
physical_schema.clone(),
predicate.clone(),
)
})
// TODO: this should not do any blocking operations, since this should
// happen when the iterators are polled and we are just creating a vec of iterators.
// Is this correct?
.try_collect::<_, Vec<_>, _>()?
.into_iter()
.flatten(),
Comment on lines +112 to +127
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love some feedback on this comment if I made the right assumptions here. We can also handle this in the map, but this gets a bit messy...

))
}
}

impl JsonHandler for DataFusionFileFormatHandler {
fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> KernelResult<Box<dyn EngineData>> {
arrow_parse_json(json_strings, output_schema)
}

fn read_json_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> KernelResult<FileDataReadResultIterator> {
let grouped_files = group_by_store(files.to_vec());
Ok(Box::new(
grouped_files
.into_iter()
.map(|(url, files)| {
self.get_or_create_json(url)?.read_json_files(
&files.to_vec(),
physical_schema.clone(),
predicate.clone(),
)
})
// TODO: this should not do any blocking operations, since this should
// happen when the iterators are polled and we are just creating a vec of iterators.
// Is this correct?
.try_collect::<_, Vec<_>, _>()?
.into_iter()
.flatten(),
))
}

fn write_json_file(
&self,
path: &url::Url,
data: Box<dyn Iterator<Item = KernelResult<Box<dyn EngineData>>> + Send + '_>,
overwrite: bool,
) -> KernelResult<()> {
self.get_or_create_json(path.as_object_store_url())?
.write_json_file(path, data, overwrite)
}
}
50 changes: 50 additions & 0 deletions crates/core/src/delta_datafusion/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::sync::Arc;

use datafusion::catalog::Session;
use datafusion::execution::TaskContext;
use delta_kernel::{Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};
use tokio::runtime::Handle;

use self::file_formats::DataFusionFileFormatHandler;
use self::storage::DataFusionStorageHandler;
use crate::kernel::ARROW_HANDLER;

mod file_formats;
mod storage;

/// A Datafusion based Kernel Engine
#[derive(Clone)]
pub struct DataFusionEngine {
storage: Arc<DataFusionStorageHandler>,
formats: Arc<DataFusionFileFormatHandler>,
}

impl DataFusionEngine {
pub fn new_from_session(session: &dyn Session) -> Arc<Self> {
Self::new(session.task_ctx(), Handle::current()).into()
}

pub fn new(ctx: Arc<TaskContext>, handle: Handle) -> Self {
let storage = Arc::new(DataFusionStorageHandler::new(ctx.clone(), handle.clone()));
let formats = Arc::new(DataFusionFileFormatHandler::new(ctx, handle));
Self { storage, formats }
}
}

impl Engine for DataFusionEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
ARROW_HANDLER.clone()
}

fn storage_handler(&self) -> Arc<dyn StorageHandler> {
self.storage.clone()
}

fn json_handler(&self) -> Arc<dyn JsonHandler> {
self.formats.clone()
}

fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
self.formats.clone()
}
}
Loading
Loading