Skip to content

Commit f61364e

Browse files
gatesnblaginin
andauthored
Dataset API (#5949)
# Dataset API A unified API to sit between query engines and data sources that preserves support for late materialization, deferred decompression, and alternate device buffers. # Open Questions * What does capabilities negotiation look like? e.g. for unsupported encodings? * What about shared dictionaries? * Is deserialize_split in the right place? Without serde, splits can share state in the DataSourceScan. With serde, there's no where for shared state to live. Perhaps we should reconstruct a datasource on the worker? Should the datasource be serializable? --------- Signed-off-by: Nicholas Gates <[email protected]> Co-authored-by: Dmitrii Blaginin <[email protected]>
1 parent 1522642 commit f61364e

File tree

7 files changed

+292
-0
lines changed

7 files changed

+292
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-scan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ vortex-mask = { workspace = true }
2929
vortex-metrics = { workspace = true }
3030
vortex-session = { workspace = true }
3131

32+
async-trait = { workspace = true }
3233
bit-vec = { workspace = true }
3334
futures = { workspace = true }
3435
itertools = { workspace = true }

vortex-scan/src/api.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![deny(missing_docs)]
5+
6+
//! The Vortex Scan API implements an abstract table scan interface that can be used to
7+
//! read data from various data sources.
8+
//!
9+
//! It supports arbitrary projection expressions, filter expressions, and limit pushdown as well
10+
//! as mechanisms for parallel and distributed execution via splits.
11+
//!
12+
//! The API is currently under development and may change in future releases, however we hope to
13+
//! stabilize into stable C ABI for use within foreign language bindings.
14+
//!
15+
//! ## Open Issues
16+
//!
17+
//! * We probably want to make the DataSource serializable as well, so that we can share
18+
//! source-level state with workers, separately from split serialization.
19+
//! * We should add a way for the client to negotiate capabilities with the data source, for
20+
//! example which encodings it knows about.
21+
22+
use std::any::Any;
23+
use std::sync::Arc;
24+
25+
use async_trait::async_trait;
26+
use futures::stream::BoxStream;
27+
use vortex_array::expr::Expression;
28+
use vortex_array::stream::SendableArrayStream;
29+
use vortex_dtype::DType;
30+
use vortex_error::VortexResult;
31+
use vortex_session::VortexSession;
32+
33+
/// Create a Vortex source from serialized configuration.
34+
///
35+
/// Providers can be registered with Vortex under a specific
36+
#[async_trait(?Send)]
37+
pub trait DataSourceProvider: 'static {
38+
/// Attempt to initialize a new source.
39+
///
40+
/// Returns `Ok(None)` if the provider cannot handle the given URI.
41+
async fn initialize(
42+
&self,
43+
uri: String,
44+
session: &VortexSession,
45+
) -> VortexResult<Option<DataSourceRef>>;
46+
}
47+
48+
/// A reference-counted data source.
49+
pub type DataSourceRef = Arc<dyn DataSource>;
50+
51+
/// A data source represents a streamable dataset that can be scanned with projection and filter
52+
/// expressions. Each scan produces splits that can be executed (potentially in parallel) to read
53+
/// data. Each split can be serialized for remote execution.
54+
///
55+
/// The DataSource may be used multiple times to create multiple scans, whereas each scan and each
56+
/// split of a scan can only be consumed once.
57+
#[async_trait]
58+
pub trait DataSource: 'static + Send + Sync {
59+
/// Returns the dtype of the source.
60+
fn dtype(&self) -> &DType;
61+
62+
/// Returns an estimate of the row count of the source.
63+
fn row_count_estimate(&self) -> Estimate<u64>;
64+
65+
/// Returns a scan over the source.
66+
async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef>;
67+
68+
/// Serialize a split from this data source.
69+
fn serialize_split(&self, split: &dyn Split) -> VortexResult<Vec<u8>>;
70+
71+
/// Deserialize a split that was previously serialized from a compatible data source.
72+
fn deserialize_split(&self, data: &[u8]) -> VortexResult<SplitRef>;
73+
}
74+
75+
/// A request to scan a data source.
76+
#[derive(Debug, Clone, Default)]
77+
pub struct ScanRequest {
78+
/// Projection expression, `None` implies `root()`.
79+
pub projection: Option<Expression>,
80+
/// Filter expression, `None` implies no filter.
81+
pub filter: Option<Expression>,
82+
/// Optional limit on the number of rows to scan.
83+
pub limit: Option<u64>,
84+
}
85+
86+
/// A boxed data source scan.
87+
pub type DataSourceScanRef = Box<dyn DataSourceScan>;
88+
89+
/// A data source scan produces splits that can be executed to read data from the source.
90+
#[async_trait]
91+
pub trait DataSourceScan: 'static + Send {
92+
/// The returned dtype of the scan.
93+
fn dtype(&self) -> &DType;
94+
95+
/// An estimate of the remaining splits.
96+
fn remaining_splits_estimate(&self) -> Estimate<usize>;
97+
98+
/// Returns the next batch of splits to be processed.
99+
///
100+
/// This should not return _more_ than `max_splits` splits, but may return fewer.
101+
async fn next_splits(&mut self, max_splits: usize) -> VortexResult<Vec<SplitRef>>;
102+
}
103+
104+
/// A stream of splits.
105+
pub type SplitStream = BoxStream<'static, VortexResult<SplitRef>>;
106+
107+
/// A reference-counted split.
108+
pub type SplitRef = Box<dyn Split>;
109+
110+
/// A split represents a unit of work that can be executed to produce a stream of arrays.
111+
pub trait Split: 'static + Send {
112+
/// Downcast the split to a concrete type.
113+
fn as_any(&self) -> &dyn Any;
114+
115+
/// Executes the split.
116+
fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream>;
117+
118+
/// Returns an estimate of the row count for this split.
119+
fn row_count_estimate(&self) -> Estimate<u64>;
120+
121+
/// Returns an estimate of the byte size for this split.
122+
fn byte_size_estimate(&self) -> Estimate<u64>;
123+
}
124+
125+
/// An estimate that can be exact, an upper bound, or unknown.
126+
#[derive(Default)]
127+
pub enum Estimate<T> {
128+
/// The exact value.
129+
Exact(T),
130+
/// An upper bound on the value.
131+
UpperBound(T),
132+
/// The value is unknown.
133+
#[default]
134+
Unknown,
135+
}

vortex-scan/src/layout.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::any::Any;
5+
use std::collections::VecDeque;
6+
7+
use async_trait::async_trait;
8+
use futures::StreamExt;
9+
use futures::future::BoxFuture;
10+
use futures::stream;
11+
use vortex_array::ArrayRef;
12+
use vortex_array::stream::ArrayStreamAdapter;
13+
use vortex_array::stream::SendableArrayStream;
14+
use vortex_dtype::DType;
15+
use vortex_error::VortexExpect;
16+
use vortex_error::VortexResult;
17+
use vortex_error::vortex_bail;
18+
use vortex_layout::LayoutReaderRef;
19+
use vortex_session::VortexSession;
20+
21+
use crate::ScanBuilder;
22+
use crate::api::DataSource;
23+
use crate::api::DataSourceScan;
24+
use crate::api::DataSourceScanRef;
25+
use crate::api::Estimate;
26+
use crate::api::ScanRequest;
27+
use crate::api::Split;
28+
use crate::api::SplitRef;
29+
30+
/// An implementation of a [`DataSource`] that reads data from a [`LayoutReaderRef`].
31+
pub struct LayoutReaderDataSource {
32+
reader: LayoutReaderRef,
33+
session: VortexSession,
34+
}
35+
36+
impl LayoutReaderDataSource {
37+
/// Creates a new [`LayoutReaderDataSource`].
38+
pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
39+
Self { reader, session }
40+
}
41+
}
42+
43+
#[async_trait]
44+
impl DataSource for LayoutReaderDataSource {
45+
fn dtype(&self) -> &DType {
46+
self.reader.dtype()
47+
}
48+
49+
fn row_count_estimate(&self) -> Estimate<u64> {
50+
Estimate::Exact(self.reader.row_count())
51+
}
52+
53+
async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
54+
let mut builder = ScanBuilder::new(self.session.clone(), self.reader.clone());
55+
56+
if let Some(projection) = scan_request.projection {
57+
builder = builder.with_projection(projection);
58+
}
59+
60+
if let Some(filter) = scan_request.filter {
61+
builder = builder.with_filter(filter);
62+
}
63+
64+
if let Some(limit) = scan_request.limit {
65+
// TODO(ngates): ScanBuilder limit should be u64
66+
let limit = usize::try_from(limit)?;
67+
builder = builder.with_limit(limit);
68+
}
69+
70+
let scan = builder.prepare()?;
71+
let dtype = scan.dtype().clone();
72+
let splits = scan.execute(None)?;
73+
74+
Ok(Box::new(LayoutReaderScan {
75+
dtype,
76+
splits: VecDeque::from_iter(splits),
77+
}))
78+
}
79+
80+
fn serialize_split(&self, _split: &dyn Split) -> VortexResult<Vec<u8>> {
81+
vortex_bail!("LayoutReader splits are not yet serializable");
82+
}
83+
84+
fn deserialize_split(&self, _split: &[u8]) -> VortexResult<SplitRef> {
85+
vortex_bail!("LayoutReader splits are not yet serializable");
86+
}
87+
}
88+
89+
struct LayoutReaderScan {
90+
dtype: DType,
91+
splits: VecDeque<BoxFuture<'static, VortexResult<Option<ArrayRef>>>>,
92+
}
93+
94+
#[async_trait]
95+
impl DataSourceScan for LayoutReaderScan {
96+
fn dtype(&self) -> &DType {
97+
&self.dtype
98+
}
99+
100+
fn remaining_splits_estimate(&self) -> Estimate<usize> {
101+
Estimate::Exact(self.splits.len())
102+
}
103+
104+
async fn next_splits(&mut self, max_splits: usize) -> VortexResult<Vec<SplitRef>> {
105+
let n = std::cmp::min(max_splits, self.splits.len());
106+
107+
let mut splits = Vec::with_capacity(n);
108+
for _ in 0..n {
109+
let fut = self
110+
.splits
111+
.pop_front()
112+
.vortex_expect("Checked length above ensures we have enough splits");
113+
splits.push(Box::new(LayoutReaderSplit {
114+
dtype: self.dtype.clone(),
115+
fut,
116+
}) as SplitRef);
117+
}
118+
119+
Ok(splits)
120+
}
121+
}
122+
123+
struct LayoutReaderSplit {
124+
dtype: DType,
125+
fut: BoxFuture<'static, VortexResult<Option<ArrayRef>>>,
126+
}
127+
128+
#[async_trait]
129+
impl Split for LayoutReaderSplit {
130+
fn as_any(&self) -> &dyn Any {
131+
self
132+
}
133+
134+
fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
135+
Ok(Box::pin(ArrayStreamAdapter::new(
136+
self.dtype,
137+
stream::once(self.fut).filter_map(|a| async move { a.transpose() }),
138+
)))
139+
}
140+
141+
fn row_count_estimate(&self) -> Estimate<u64> {
142+
Estimate::Unknown
143+
}
144+
145+
fn byte_size_estimate(&self) -> Estimate<u64> {
146+
Estimate::Unknown
147+
}
148+
}

vortex-scan/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
/// We don't actually know if this is right, but it is probably a good estimate.
77
const IDEAL_SPLIT_SIZE: u64 = 100_000;
88

9+
pub mod api;
910
pub mod arrow;
1011
mod filter;
1112
pub mod row_mask;
@@ -21,6 +22,7 @@ pub use split_by::SplitBy;
2122
mod scan_builder;
2223
pub use scan_builder::ScanBuilder;
2324

25+
pub mod layout;
2426
mod repeated_scan;
2527
#[cfg(test)]
2628
mod test;

vortex-scan/src/repeated_scan.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ pub struct RepeatedScan<A: 'static + Send> {
5656
}
5757

5858
impl RepeatedScan<ArrayRef> {
59+
pub fn dtype(&self) -> &DType {
60+
&self.dtype
61+
}
62+
5963
pub fn execute_array_iter<B: BlockingRuntime>(
6064
&self,
6165
row_range: Option<Range<u64>>,

vortex/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub mod compressor {
4040
pub mod dtype {
4141
pub use vortex_dtype::*;
4242
}
43+
4344
pub mod error {
4445
pub use vortex_error::*;
4546
}

0 commit comments

Comments
 (0)