Skip to content

Commit 5cb7ebb

Browse files
peaseegatesn
authored andcommitted
feat: Support retrieving WriterStrategyBuilder from VortexSession (#6)
* Fix session get-or-default (vortex-data#5662) The comments described this get-or-default, but instead it was a panic --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> * feat: Support retrieving writer strategy builder from vortex session --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> Co-authored-by: Nicholas Gates <gatesn@users.noreply.github.com>
1 parent 0f8fc17 commit 5cb7ebb

4 files changed

Lines changed: 51 additions & 26 deletions

File tree

vortex-file/src/strategy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const ONE_MEG: u64 = 1 << 20;
2828
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
2929
/// repartitioning and compressing them to strike a balance between size on-disk,
3030
/// bulk decoding performance, and IOPS required to perform an indexed read.
31+
#[derive(Debug, Clone)]
3132
pub struct WriteStrategyBuilder {
3233
compressor: Option<Arc<dyn CompressorPlugin>>,
3334
row_block_size: usize,

vortex-file/src/writer.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,14 @@ pub struct VortexWriteOptions {
6868
pub trait WriteOptionsSessionExt: SessionExt {
6969
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
7070
fn write_options(&self) -> VortexWriteOptions {
71+
let maybe_write_strategy_builder = self.get_opt::<WriteStrategyBuilder>();
72+
let strategy = maybe_write_strategy_builder
73+
.map(|opt| opt.clone().build())
74+
.unwrap_or_else(|| WriteStrategyBuilder::new().build());
75+
7176
VortexWriteOptions {
7277
session: self.session(),
73-
strategy: WriteStrategyBuilder::new().build(),
78+
strategy,
7479
exclude_dtype: false,
7580
file_statistics: PRUNING_STATS.to_vec(),
7681
max_variable_length_statistics_size: 64,
@@ -458,3 +463,20 @@ impl WriteSummary {
458463
self.footer.row_count()
459464
}
460465
}
466+
467+
#[cfg(test)]
468+
mod tests {
469+
use super::*;
470+
471+
#[test]
472+
fn test_write_options_from_session_vars() {
473+
let session = VortexSession::empty();
474+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
475+
assert!(fetched_write_strategy.is_none());
476+
drop(fetched_write_strategy);
477+
478+
let session = session.set(WriteStrategyBuilder::new());
479+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
480+
assert!(fetched_write_strategy.is_some());
481+
}
482+
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::sequence::SequentialStreamExt;
2727
/// meet this interface.
2828
///
2929
/// API consumers are also free to implement this trait to provide new plugin compressors.
30-
pub trait CompressorPlugin: Send + Sync + 'static {
30+
pub trait CompressorPlugin: std::fmt::Debug + Send + Sync + 'static {
3131
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
3232
}
3333

@@ -39,7 +39,7 @@ impl CompressorPlugin for Arc<dyn CompressorPlugin> {
3939

4040
impl<F> CompressorPlugin for F
4141
where
42-
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
42+
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static + std::fmt::Debug,
4343
{
4444
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
4545
self(chunk)

vortex-session/src/lib.rs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,14 @@
33

44
pub mod registry;
55

6-
use std::any::Any;
7-
use std::any::TypeId;
8-
use std::any::type_name;
6+
use std::any::{Any, TypeId, type_name};
97
use std::fmt::Debug;
10-
use std::hash::BuildHasherDefault;
11-
use std::hash::Hasher;
12-
use std::ops::Deref;
13-
use std::ops::DerefMut;
8+
use std::hash::{BuildHasherDefault, Hasher};
9+
use std::ops::{Deref, DerefMut};
1410
use std::sync::Arc;
1511

16-
use dashmap::DashMap;
17-
use dashmap::Entry;
18-
use vortex_error::VortexExpect;
19-
use vortex_error::vortex_panic;
12+
use dashmap::{DashMap, Entry};
13+
use vortex_error::{VortexExpect, vortex_panic};
2014

2115
/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes,
2216
/// etc. that are available for use in a given context.
@@ -52,6 +46,26 @@ impl VortexSession {
5246
}
5347
self
5448
}
49+
50+
/// Inserts a new session variable of type `V` with the supplied value.
51+
///
52+
/// # Panics
53+
///
54+
/// If a variable of that type already exists.
55+
pub fn set<V: SessionVar>(self, val: V) -> Self {
56+
match self.0.entry(TypeId::of::<V>()) {
57+
Entry::Occupied(_) => {
58+
vortex_panic!(
59+
"Session variable of type {} already exists",
60+
type_name::<V>()
61+
);
62+
}
63+
Entry::Vacant(e) => {
64+
e.insert(Box::new(val));
65+
}
66+
}
67+
self
68+
}
5569
}
5670

5771
/// Trait for accessing and modifying the state of a Vortex session.
@@ -88,18 +102,6 @@ impl SessionExt for VortexSession {
88102

89103
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
90104
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
91-
// NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
92-
// would immediately acquire an exclusive write lock.
93-
if let Some(v) = self.0.get(&TypeId::of::<V>()) {
94-
return Ref(v.map(|v| {
95-
(**v)
96-
.as_any()
97-
.downcast_ref::<V>()
98-
.vortex_expect("Type mismatch - this is a bug")
99-
}));
100-
}
101-
102-
// If we get here, the value was not present, so we insert the default with a write lock.
103105
Ref(self
104106
.0
105107
.entry(TypeId::of::<V>())

0 commit comments

Comments
 (0)