Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const ONE_MEG: u64 = 1 << 20;
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
/// repartitioning and compressing them to strike a balance between size on-disk,
/// bulk decoding performance, and IOPS required to perform an indexed read.
#[derive(Debug, Clone)]
pub struct WriteStrategyBuilder {
compressor: Option<Arc<dyn CompressorPlugin>>,
row_block_size: usize,
Expand Down
24 changes: 23 additions & 1 deletion vortex-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@ pub struct VortexWriteOptions {
pub trait WriteOptionsSessionExt: SessionExt {
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
fn write_options(&self) -> VortexWriteOptions {
let maybe_write_strategy_builder = self.get_opt::<WriteStrategyBuilder>();
let strategy = maybe_write_strategy_builder
.map(|opt| opt.clone().build())
.unwrap_or_else(|| WriteStrategyBuilder::new().build());

VortexWriteOptions {
session: self.session(),
strategy: WriteStrategyBuilder::new().build(),
strategy,
exclude_dtype: false,
file_statistics: PRUNING_STATS.to_vec(),
max_variable_length_statistics_size: 64,
Expand Down Expand Up @@ -431,3 +436,20 @@ impl WriteSummary {
self.footer.row_count()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_write_options_from_session_vars() {
let session = VortexSession::empty();
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
assert!(fetched_write_strategy.is_none());
drop(fetched_write_strategy);

let session = session.set(WriteStrategyBuilder::new());
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
assert!(fetched_write_strategy.is_some());
}
}
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/compressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{LayoutRef, LayoutStrategy};
/// meet this interface.
///
/// API consumers are also free to implement this trait to provide new plugin compressors.
pub trait CompressorPlugin: Send + Sync + 'static {
pub trait CompressorPlugin: std::fmt::Debug + Send + Sync + 'static {
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
}

Expand All @@ -35,7 +35,7 @@ impl CompressorPlugin for Arc<dyn CompressorPlugin> {

impl<F> CompressorPlugin for F
where
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static + std::fmt::Debug,
Comment thread
peasee marked this conversation as resolved.
{
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
self(chunk)
Expand Down
71 changes: 59 additions & 12 deletions vortex-session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ impl VortexSession {
}
self
}

/// Inserts a new session variable of type `V` with the supplied value.
///
/// # Panics
///
/// If a variable of that type already exists.
pub fn set<V: SessionVar>(self, val: V) -> Self {
match self.0.entry(TypeId::of::<V>()) {
Entry::Occupied(_) => {
vortex_panic!(
"Session variable of type {} already exists",
type_name::<V>()
);
}
Entry::Vacant(e) => {
e.insert(Box::new(val));
}
}
self
}
}

/// Trait for accessing and modifying the state of a Vortex session.
Expand All @@ -54,12 +74,20 @@ pub trait SessionExt: Sized + private::Sealed {
fn session(&self) -> VortexSession;

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
fn get<V: SessionVar>(&self) -> Ref<'_, V>;
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V>;

/// Returns the scope variable of type `V` if it exists.
fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut<V: SessionVar>(&self) -> RefMut<'_, V>;
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;

/// Returns the scope variable of type `V`, if it exists.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
}

mod private {
Expand All @@ -73,13 +101,12 @@ impl SessionExt for VortexSession {
}

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
fn get<V: SessionVar>(&self) -> Ref<'_, V> {
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
Ref(self
.0
.get(&TypeId::of::<V>())
.unwrap_or_else(|| {
vortex_panic!("Session has not been initialized with {}", type_name::<V>())
})
.entry(TypeId::of::<V>())
.or_insert_with(|| Box::new(V::default()))
.downgrade()
.map(|v| {
(**v)
.as_any()
Expand All @@ -88,16 +115,25 @@ impl SessionExt for VortexSession {
}))
}

fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
self.0.get(&TypeId::of::<V>()).map(|v| {
Ref(v.map(|v| {
(**v)
.as_any()
.downcast_ref::<V>()
.vortex_expect("Type mismatch - this is a bug")
}))
})
}

/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
///
/// Note that the returned value internally holds a lock on the variable.
fn get_mut<V: SessionVar>(&self) -> RefMut<'_, V> {
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
RefMut(
self.0
.get_mut(&TypeId::of::<V>())
.unwrap_or_else(|| {
vortex_panic!("Session has not been initialized with {}", type_name::<V>())
})
.entry(TypeId::of::<V>())
.or_insert_with(|| Box::new(V::default()))
.map(|v| {
(**v)
.as_any_mut()
Expand All @@ -106,6 +142,17 @@ impl SessionExt for VortexSession {
}),
)
}

fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
self.0.get_mut(&TypeId::of::<V>()).map(|v| {
RefMut(v.map(|v| {
(**v)
.as_any_mut()
.downcast_mut::<V>()
.vortex_expect("Type mismatch - this is a bug")
}))
})
}
}

/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.
Expand Down
Loading