Skip to content

Commit b701e1e

Browse files
peaseegatesn
andauthored
feat: Support retrieving WriterStrategyBuilder from VortexSession (#6)
* Fix session get-or-default (vortex-data#5662) The comments described this get-or-default, but instead it was a panic --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> * feat: Support retrieving writer strategy builder from vortex session --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> Co-authored-by: Nicholas Gates <gatesn@users.noreply.github.com>
1 parent c279ace commit b701e1e

4 files changed

Lines changed: 85 additions & 15 deletions

File tree

vortex-file/src/strategy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const ONE_MEG: u64 = 1 << 20;
2323
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
2424
/// repartitioning and compressing them to strike a balance between size on-disk,
2525
/// bulk decoding performance, and IOPS required to perform an indexed read.
26+
#[derive(Debug, Clone)]
2627
pub struct WriteStrategyBuilder {
2728
compressor: Option<Arc<dyn CompressorPlugin>>,
2829
row_block_size: usize,

vortex-file/src/writer.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,14 @@ pub struct VortexWriteOptions {
4545
pub trait WriteOptionsSessionExt: SessionExt {
4646
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
4747
fn write_options(&self) -> VortexWriteOptions {
48+
let maybe_write_strategy_builder = self.get_opt::<WriteStrategyBuilder>();
49+
let strategy = maybe_write_strategy_builder
50+
.map(|opt| opt.clone().build())
51+
.unwrap_or_else(|| WriteStrategyBuilder::new().build());
52+
4853
VortexWriteOptions {
4954
session: self.session(),
50-
strategy: WriteStrategyBuilder::new().build(),
55+
strategy,
5156
exclude_dtype: false,
5257
file_statistics: PRUNING_STATS.to_vec(),
5358
max_variable_length_statistics_size: 64,
@@ -431,3 +436,20 @@ impl WriteSummary {
431436
self.footer.row_count()
432437
}
433438
}
439+
440+
#[cfg(test)]
441+
mod tests {
442+
use super::*;
443+
444+
#[test]
445+
fn test_write_options_from_session_vars() {
446+
let session = VortexSession::empty();
447+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
448+
assert!(fetched_write_strategy.is_none());
449+
drop(fetched_write_strategy);
450+
451+
let session = session.set(WriteStrategyBuilder::new());
452+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
453+
assert!(fetched_write_strategy.is_some());
454+
}
455+
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{LayoutRef, LayoutStrategy};
2323
/// meet this interface.
2424
///
2525
/// API consumers are also free to implement this trait to provide new plugin compressors.
26-
pub trait CompressorPlugin: Send + Sync + 'static {
26+
pub trait CompressorPlugin: std::fmt::Debug + Send + Sync + 'static {
2727
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
2828
}
2929

@@ -35,7 +35,7 @@ impl CompressorPlugin for Arc<dyn CompressorPlugin> {
3535

3636
impl<F> CompressorPlugin for F
3737
where
38-
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
38+
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static + std::fmt::Debug,
3939
{
4040
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
4141
self(chunk)

vortex-session/src/lib.rs

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ impl VortexSession {
4646
}
4747
self
4848
}
49+
50+
/// Inserts a new session variable of type `V` with the supplied value.
51+
///
52+
/// # Panics
53+
///
54+
/// If a variable of that type already exists.
55+
pub fn set<V: SessionVar>(self, val: V) -> Self {
56+
match self.0.entry(TypeId::of::<V>()) {
57+
Entry::Occupied(_) => {
58+
vortex_panic!(
59+
"Session variable of type {} already exists",
60+
type_name::<V>()
61+
);
62+
}
63+
Entry::Vacant(e) => {
64+
e.insert(Box::new(val));
65+
}
66+
}
67+
self
68+
}
4969
}
5070

5171
/// Trait for accessing and modifying the state of a Vortex session.
@@ -54,12 +74,20 @@ pub trait SessionExt: Sized + private::Sealed {
5474
fn session(&self) -> VortexSession;
5575

5676
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
57-
fn get<V: SessionVar>(&self) -> Ref<'_, V>;
77+
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V>;
78+
79+
/// Returns the scope variable of type `V` if it exists.
80+
fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;
5881

5982
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
6083
///
6184
/// Note that the returned value internally holds a lock on the variable.
62-
fn get_mut<V: SessionVar>(&self) -> RefMut<'_, V>;
85+
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;
86+
87+
/// Returns the scope variable of type `V`, if it exists.
88+
///
89+
/// Note that the returned value internally holds a lock on the variable.
90+
fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
6391
}
6492

6593
mod private {
@@ -73,13 +101,12 @@ impl SessionExt for VortexSession {
73101
}
74102

75103
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
76-
fn get<V: SessionVar>(&self) -> Ref<'_, V> {
104+
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
77105
Ref(self
78106
.0
79-
.get(&TypeId::of::<V>())
80-
.unwrap_or_else(|| {
81-
vortex_panic!("Session has not been initialized with {}", type_name::<V>())
82-
})
107+
.entry(TypeId::of::<V>())
108+
.or_insert_with(|| Box::new(V::default()))
109+
.downgrade()
83110
.map(|v| {
84111
(**v)
85112
.as_any()
@@ -88,16 +115,25 @@ impl SessionExt for VortexSession {
88115
}))
89116
}
90117

118+
fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
119+
self.0.get(&TypeId::of::<V>()).map(|v| {
120+
Ref(v.map(|v| {
121+
(**v)
122+
.as_any()
123+
.downcast_ref::<V>()
124+
.vortex_expect("Type mismatch - this is a bug")
125+
}))
126+
})
127+
}
128+
91129
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
92130
///
93131
/// Note that the returned value internally holds a lock on the variable.
94-
fn get_mut<V: SessionVar>(&self) -> RefMut<'_, V> {
132+
fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
95133
RefMut(
96134
self.0
97-
.get_mut(&TypeId::of::<V>())
98-
.unwrap_or_else(|| {
99-
vortex_panic!("Session has not been initialized with {}", type_name::<V>())
100-
})
135+
.entry(TypeId::of::<V>())
136+
.or_insert_with(|| Box::new(V::default()))
101137
.map(|v| {
102138
(**v)
103139
.as_any_mut()
@@ -106,6 +142,17 @@ impl SessionExt for VortexSession {
106142
}),
107143
)
108144
}
145+
146+
fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
147+
self.0.get_mut(&TypeId::of::<V>()).map(|v| {
148+
RefMut(v.map(|v| {
149+
(**v)
150+
.as_any_mut()
151+
.downcast_mut::<V>()
152+
.vortex_expect("Type mismatch - this is a bug")
153+
}))
154+
})
155+
}
109156
}
110157

111158
/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.

0 commit comments

Comments
 (0)