diff --git a/Cargo.lock b/Cargo.lock index 403350e1..e84b56db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.82" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index c2c706db..39b2a2dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ default-run = "dft" [dependencies] arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] , optional = true } -async-trait = "0.1.80" +async-trait = "0.1.83" clap = { version = "4.5.1", features = ["derive"] } color-eyre = "0.6.3" crossterm = { version = "0.28.1", features = ["event-stream"] } diff --git a/src/extensions/s3.rs b/src/extensions/s3.rs index 377bd449..fa92bc83 100644 --- a/src/extensions/s3.rs +++ b/src/extensions/s3.rs @@ -19,6 +19,7 @@ use crate::config::ExecutionConfig; use crate::extensions::{DftSessionStateBuilder, Extension}; +use crate::object_stores::io_object_store::IoObjectStore; use log::info; use std::sync::Arc; @@ -56,9 +57,10 @@ impl Extension for AwsS3Extension { info!("Endpoint exists"); if let Ok(parsed_endpoint) = Url::parse(object_store_url) { info!("Parsed endpoint"); + let io_store = IoObjectStore::new(Arc::new(object_store)); builder .runtime_env() - .register_object_store(&parsed_endpoint, Arc::new(object_store)); + .register_object_store(&parsed_endpoint, Arc::new(io_store)); info!("Registered s3 object store"); } } diff --git a/src/lib.rs b/src/lib.rs index 4b8c9256..5f7c4a1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ pub mod execution; pub mod extensions; #[cfg(feature = "experimental-flightsql-server")] pub mod flightsql_server; +pub mod object_stores; pub mod telemetry; pub mod test_utils; pub mod tui; diff --git a/src/object_stores/io_object_store.rs b/src/object_stores/io_object_store.rs new file mode 100644 index 00000000..6efd6936 --- /dev/null +++ b/src/object_stores/io_object_store.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +use crate::execution::executor::io::spawn_io; + +/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying methods with +/// [`spawn_io`] so that they are run on the Tokio Runtime dedicated to doing IO. +#[derive(Debug)] +pub struct IoObjectStore { + inner: Arc, +} + +impl IoObjectStore { + pub fn new(object_store: Arc) -> Self { + Self { + inner: object_store, + } + } +} + +impl std::fmt::Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IoObjectStore") + } +} + +#[async_trait] +impl ObjectStore for IoObjectStore { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.get_opts(&location, options).await }).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.delete(&location).await }).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let prefix = prefix.cloned(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.list_with_delimiter(prefix.as_ref()).await }).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.put_multipart_opts(&location, opts).await }).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.put_opts(&location, payload, opts).await }).await + } +} diff --git a/src/object_stores/mod.rs b/src/object_stores/mod.rs new file mode 100644 index 00000000..705051cd --- /dev/null +++ b/src/object_stores/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#[cfg(feature = "s3")] +pub mod io_object_store;