Skip to content

Commit e5649fc

Browse files
authored
Allow configuring different readers for DF integration (vortex-data#6027)
This PR allows using non object-store based readers when working with the Vortex `FileSource`, like when users want custom caching, observability or anything else. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent edeef00 commit e5649fc

6 files changed

Lines changed: 105 additions & 17 deletions

File tree

vortex-datafusion/src/persistent/cache.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use chrono::Utc;
99
use datafusion_common::ScalarValue;
1010
use moka::future::Cache;
1111
use object_store::ObjectMeta;
12-
use object_store::ObjectStore;
1312
use object_store::path::Path;
1413
use vortex::buffer::ByteBuffer;
1514
use vortex::dtype::DType;
@@ -22,6 +21,7 @@ use vortex::file::Footer;
2221
use vortex::file::OpenOptionsSessionExt;
2322
use vortex::file::SegmentSpec;
2423
use vortex::file::VortexFile;
24+
use vortex::io::VortexReadAt;
2525
use vortex::layout::segments::SegmentCache;
2626
use vortex::layout::segments::SegmentId;
2727
use vortex::metrics::MetricsSessionExt;
@@ -100,7 +100,7 @@ impl VortexFileCache {
100100
pub async fn try_get(
101101
&self,
102102
object: &ObjectMeta,
103-
object_store: Arc<dyn ObjectStore>,
103+
reader: Arc<dyn VortexReadAt>,
104104
) -> VortexResult<VortexFile> {
105105
let file_key = FileKey::from(object);
106106
self.file_cache
@@ -119,7 +119,7 @@ impl VortexFileCache {
119119
file_key,
120120
segment_cache: self.segment_cache.clone(),
121121
}))
122-
.open_object_store(&object_store, object.location.as_ref()),
122+
.open(reader),
123123
)
124124
.await
125125
.map_err(|e: Arc<VortexError>| {

vortex-datafusion/src/persistent/format.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use vortex::expr::stats::Stat;
5555
use vortex::file::EOF_SIZE;
5656
use vortex::file::MAX_POSTSCRIPT_SIZE;
5757
use vortex::file::VORTEX_FILE_EXTENSION;
58+
use vortex::io::file::object_store::ObjectStoreSource;
59+
use vortex::io::session::RuntimeSessionExt;
5860
use vortex::scalar::Scalar;
5961
use vortex::session::VortexSession;
6062

@@ -243,10 +245,14 @@ impl FileFormat for VortexFormat {
243245
) -> DFResult<SchemaRef> {
244246
let mut file_schemas = stream::iter(objects.iter().cloned())
245247
.map(|o| {
246-
let store = store.clone();
248+
let reader = Arc::new(ObjectStoreSource::new(
249+
store.clone(),
250+
o.location.clone(),
251+
self.session.handle(),
252+
));
247253
let cache = self.file_cache.clone();
248254
SpawnedTask::spawn(async move {
249-
let vxf = cache.try_get(&o, store).await?;
255+
let vxf = cache.try_get(&o, reader).await?;
250256
let inferred_schema = vxf.dtype().to_arrow_schema()?;
251257
VortexResult::Ok((o.location, inferred_schema))
252258
})
@@ -275,9 +281,15 @@ impl FileFormat for VortexFormat {
275281
let object = object.clone();
276282
let store = store.clone();
277283
let cache = self.file_cache.clone();
284+
let handle = self.session.handle();
278285

279286
SpawnedTask::spawn(async move {
280-
let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
287+
let reader = Arc::new(ObjectStoreSource::new(
288+
store.clone(),
289+
object.location.clone(),
290+
handle,
291+
));
292+
let vxf = cache.try_get(&object, reader).await.map_err(|e| {
281293
DataFusionError::Execution(format!(
282294
"Failed to open Vortex file {}: {e}",
283295
object.location

vortex-datafusion/src/persistent/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod cache;
77
mod format;
88
pub mod metrics;
99
mod opener;
10+
mod reader;
1011
mod sink;
1112
mod source;
1213
mod stream;
@@ -15,6 +16,8 @@ pub use access_plan::VortexAccessPlan;
1516
pub use format::VortexFormat;
1617
pub use format::VortexFormatFactory;
1718
pub use format::VortexOptions;
19+
pub use reader::DefaultVortexReaderFactory;
20+
pub use reader::VortexReaderFactory;
1821
pub use source::VortexSource;
1922

2023
#[cfg(test)]

vortex-datafusion/src/persistent/opener.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use futures::FutureExt;
2828
use futures::StreamExt;
2929
use futures::TryStreamExt;
3030
use futures::stream;
31-
use object_store::ObjectStore;
3231
use object_store::path::Path;
3332
use tracing::Instrument;
3433
use vortex::array::ArrayRef;
@@ -44,6 +43,7 @@ use vortex_utils::aliases::dash_map::Entry;
4443

4544
use super::cache::VortexFileCache;
4645
use crate::VortexAccessPlan;
46+
use crate::VortexReaderFactory;
4747
use crate::convert::exprs::ExpressionConvertor;
4848
use crate::convert::exprs::ProcessedProjection;
4949
use crate::convert::exprs::make_vortex_predicate;
@@ -53,7 +53,7 @@ use crate::persistent::stream::PrunableStream;
5353
#[derive(Clone)]
5454
pub(crate) struct VortexOpener {
5555
pub session: VortexSession,
56-
pub object_store: Arc<dyn ObjectStore>,
56+
pub vortex_reader_factory: Arc<dyn VortexReaderFactory>,
5757
/// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is
5858
/// all fields in the final scan result not including the partition columns.
5959
pub projection: ProjectionExprs,
@@ -90,11 +90,14 @@ pub(crate) struct VortexOpener {
9090
impl FileOpener for VortexOpener {
9191
fn open(&self, file: PartitionedFile) -> DFResult<FileOpenFuture> {
9292
let session = self.session.clone();
93-
let object_store = self.object_store.clone();
9493

9594
let mut projection = self.projection.clone();
9695
let mut filter = self.filter.clone();
9796

97+
let reader = self
98+
.vortex_reader_factory
99+
.create_reader(file.path().as_ref(), &session)?;
100+
98101
let file_pruning_predicate = self.file_pruning_predicate.clone();
99102
let expr_adapter_factory = self.expr_adapter_factory.clone();
100103

@@ -158,7 +161,7 @@ impl FileOpener for VortexOpener {
158161
}
159162

160163
let vxf = file_cache
161-
.try_get(&file.object_meta, object_store)
164+
.try_get(&file.object_meta, reader)
162165
.await
163166
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;
164167

@@ -382,6 +385,7 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
382385

383386
#[cfg(test)]
384387
mod tests {
388+
use std::sync::Arc;
385389
use std::sync::LazyLock;
386390

387391
use arrow_schema::Field;
@@ -407,6 +411,7 @@ mod tests {
407411
use datafusion_physical_expr::projection::ProjectionExpr;
408412
use insta::assert_snapshot;
409413
use itertools::Itertools;
414+
use object_store::ObjectStore;
410415
use object_store::memory::InMemory;
411416
use rstest::rstest;
412417
use vortex::VortexSessionDefault;
@@ -419,6 +424,7 @@ mod tests {
419424
use vortex::session::VortexSession;
420425

421426
use super::*;
427+
use crate::DefaultVortexReaderFactory;
422428
use crate::VortexAccessPlan;
423429
use crate::convert::exprs::DefaultExpressionConvertor;
424430

@@ -491,7 +497,7 @@ mod tests {
491497
) -> VortexOpener {
492498
VortexOpener {
493499
session: SESSION.clone(),
494-
object_store,
500+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
495501
projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()),
496502
filter,
497503
file_pruning_predicate: None,
@@ -583,7 +589,7 @@ mod tests {
583589

584590
let make_opener = |filter| VortexOpener {
585591
session: SESSION.clone(),
586-
object_store: object_store.clone(),
592+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
587593
projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()),
588594
filter: Some(filter),
589595
file_pruning_predicate: None,
@@ -666,7 +672,7 @@ mod tests {
666672

667673
let opener = VortexOpener {
668674
session: SESSION.clone(),
669-
object_store: object_store.clone(),
675+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
670676
projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema),
671677
filter: None,
672678
file_pruning_predicate: None,
@@ -815,7 +821,7 @@ mod tests {
815821

816822
let opener = VortexOpener {
817823
session: SESSION.clone(),
818-
object_store: object_store.clone(),
824+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
819825
projection: ProjectionExprs::from_indices(
820826
projection.as_ref(),
821827
table_schema.file_schema(),
@@ -874,7 +880,7 @@ mod tests {
874880
) -> VortexOpener {
875881
VortexOpener {
876882
session: SESSION.clone(),
877-
object_store,
883+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
878884
projection,
879885
filter: None,
880886
file_pruning_predicate: None,
@@ -1072,7 +1078,7 @@ mod tests {
10721078

10731079
let opener = VortexOpener {
10741080
session: SESSION.clone(),
1075-
object_store: object_store.clone(),
1081+
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
10761082
projection,
10771083
filter: None,
10781084
file_pruning_predicate: None,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::fmt::Debug;
5+
use std::sync::Arc;
6+
7+
use datafusion_common::Result as DFResult;
8+
use object_store::ObjectStore;
9+
use vortex::io::VortexReadAt;
10+
use vortex::io::file::object_store::ObjectStoreSource;
11+
use vortex::io::session::RuntimeSessionExt;
12+
use vortex::session::VortexSession;
13+
14+
/// Factory to create [`VortexReadAt`] instances to read the target file.
15+
pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
16+
/// Create a reader for a target object.
17+
fn create_reader(&self, path: &str, session: &VortexSession)
18+
-> DFResult<Arc<dyn VortexReadAt>>;
19+
}
20+
21+
/// Default factory, creates [`ObjectStore`] backed readers for files,
22+
/// works with multiple cloud providers.
23+
#[derive(Debug)]
24+
pub struct DefaultVortexReaderFactory {
25+
object_store: Arc<dyn ObjectStore>,
26+
}
27+
28+
impl DefaultVortexReaderFactory {
29+
/// Creates new instance
30+
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
31+
Self { object_store }
32+
}
33+
}
34+
35+
impl VortexReaderFactory for DefaultVortexReaderFactory {
36+
fn create_reader(
37+
&self,
38+
path: &str,
39+
session: &VortexSession,
40+
) -> DFResult<Arc<dyn VortexReadAt>> {
41+
Ok(Arc::new(ObjectStoreSource::new(
42+
self.object_store.clone(),
43+
path.into(),
44+
session.handle(),
45+
)) as _)
46+
}
47+
}

vortex-datafusion/src/persistent/source.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use vortex_utils::aliases::dash_map::DashMap;
3535
use super::cache::VortexFileCache;
3636
use super::metrics::PARTITION_LABEL;
3737
use super::opener::VortexOpener;
38+
use crate::DefaultVortexReaderFactory;
39+
use crate::VortexReaderFactory;
3840
use crate::convert::exprs::DefaultExpressionConvertor;
3941
use crate::convert::exprs::ExpressionConvertor;
4042

@@ -60,6 +62,7 @@ pub struct VortexSource {
6062
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
6163
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
6264
expression_convertor: Arc<dyn ExpressionConvertor>,
65+
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
6366
}
6467

6568
impl VortexSource {
@@ -83,6 +86,7 @@ impl VortexSource {
8386
_unused_df_metrics: Default::default(),
8487
layout_readers: Arc::new(DashMap::default()),
8588
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
89+
vortex_reader_factory: None,
8690
}
8791
}
8892

@@ -94,6 +98,17 @@ impl VortexSource {
9498
self.expression_convertor = expr_convertor;
9599
self
96100
}
101+
102+
/// Set a user-defined factory to create the underlying [`VortexReadAt`]
103+
///
104+
/// [`VortexReadAt`]: vortex::io::VortexReadAt
105+
pub fn with_vortex_reader_factory(
106+
mut self,
107+
vortex_reader_factory: Arc<dyn VortexReaderFactory>,
108+
) -> Self {
109+
self.vortex_reader_factory = Some(vortex_reader_factory);
110+
self
111+
}
97112
}
98113

99114
impl FileSource for VortexSource {
@@ -117,9 +132,14 @@ impl FileSource for VortexSource {
117132
.clone()
118133
.unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
119134

135+
let vortex_reader_factory = self
136+
.vortex_reader_factory
137+
.clone()
138+
.unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
139+
120140
let opener = VortexOpener {
121141
session: self.session.clone(),
122-
object_store,
142+
vortex_reader_factory,
123143
projection: self.projection.clone(),
124144
filter: self.vortex_predicate.clone(),
125145
file_pruning_predicate: self.full_predicate.clone(),

0 commit comments

Comments
 (0)