Skip to content

Commit 81aa3be

Browse files
peaseegatesn
authored andcommitted
feat: Support retrieving WriterStrategyBuilder from VortexSession (#6)
* Fix session get-or-default (vortex-data#5662) The comments described this get-or-default, but instead it was a panic --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> * feat: Support retrieving writer strategy builder from vortex session --------- Signed-off-by: Nicholas Gates <nick@nickgates.com> Co-authored-by: Nicholas Gates <gatesn@users.noreply.github.com>
1 parent 3632215 commit 81aa3be

4 files changed

Lines changed: 63 additions & 28 deletions

File tree

vortex-file/src/strategy.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,25 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
111111
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
112112
/// repartitioning and compressing them to strike a balance between size on-disk,
113113
/// bulk decoding performance, and IOPS required to perform an indexed read.
114+
#[derive(Clone)]
114115
pub struct WriteStrategyBuilder {
115116
compressor: Option<Arc<dyn CompressorPlugin>>,
116117
row_block_size: usize,
117118
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
118119
allow_encodings: Option<ArrayRegistry>,
119120
}
120121

122+
impl std::fmt::Debug for WriteStrategyBuilder {
123+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124+
f.debug_struct("WriteStrategyBuilder")
125+
.field("compressor", &self.compressor)
126+
.field("row_block_size", &self.row_block_size)
127+
.field("field_writers", &format!("({} entries)", self.field_writers.len()))
128+
.field("allow_encodings", &self.allow_encodings.as_ref().map(|_| "..."))
129+
.finish()
130+
}
131+
}
132+
121133
impl Default for WriteStrategyBuilder {
122134
/// Create a new empty builder. It can be further configured,
123135
/// and then finally built yielding the [`LayoutStrategy`].

vortex-file/src/writer.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,14 @@ pub struct VortexWriteOptions {
6969
pub trait WriteOptionsSessionExt: SessionExt {
7070
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
7171
fn write_options(&self) -> VortexWriteOptions {
72-
let session = self.session();
72+
let maybe_write_strategy_builder = self.get_opt::<WriteStrategyBuilder>();
73+
let strategy = maybe_write_strategy_builder
74+
.map(|opt| opt.clone().build())
75+
.unwrap_or_else(|| WriteStrategyBuilder::default().build());
76+
7377
VortexWriteOptions {
74-
strategy: WriteStrategyBuilder::default().build(),
75-
session,
78+
session: self.session(),
79+
strategy,
7680
exclude_dtype: false,
7781
file_statistics: PRUNING_STATS.to_vec(),
7882
max_variable_length_statistics_size: 64,
@@ -462,3 +466,20 @@ impl WriteSummary {
462466
self.footer.row_count()
463467
}
464468
}
469+
470+
#[cfg(test)]
471+
mod tests {
472+
use super::*;
473+
474+
#[test]
475+
fn test_write_options_from_session_vars() {
476+
let session = VortexSession::empty();
477+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
478+
assert!(fetched_write_strategy.is_none());
479+
drop(fetched_write_strategy);
480+
481+
let session = session.set(WriteStrategyBuilder::new());
482+
let fetched_write_strategy = session.get_opt::<WriteStrategyBuilder>();
483+
assert!(fetched_write_strategy.is_some());
484+
}
485+
}

vortex-layout/src/layouts/compressed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::sequence::SequentialStreamExt;
2929
/// meet this interface.
3030
///
3131
/// API consumers are also free to implement this trait to provide new plugin compressors.
32-
pub trait CompressorPlugin: Send + Sync + 'static {
32+
pub trait CompressorPlugin: std::fmt::Debug + Send + Sync + 'static {
3333
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef>;
3434
}
3535

@@ -41,7 +41,7 @@ impl CompressorPlugin for Arc<dyn CompressorPlugin> {
4141

4242
impl<F> CompressorPlugin for F
4343
where
44-
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static,
44+
F: Fn(&dyn Array) -> VortexResult<ArrayRef> + Send + Sync + 'static + std::fmt::Debug,
4545
{
4646
fn compress_chunk(&self, chunk: &dyn Array) -> VortexResult<ArrayRef> {
4747
self(chunk)

vortex-session/src/lib.rs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,14 @@
33

44
pub mod registry;
55

6-
use std::any::Any;
7-
use std::any::TypeId;
8-
use std::any::type_name;
6+
use std::any::{Any, TypeId, type_name};
97
use std::fmt::Debug;
10-
use std::hash::BuildHasherDefault;
11-
use std::hash::Hasher;
12-
use std::ops::Deref;
13-
use std::ops::DerefMut;
8+
use std::hash::{BuildHasherDefault, Hasher};
9+
use std::ops::{Deref, DerefMut};
1410
use std::sync::Arc;
1511

16-
use dashmap::DashMap;
17-
use dashmap::Entry;
18-
use vortex_error::VortexExpect;
19-
use vortex_error::vortex_panic;
12+
use dashmap::{DashMap, Entry};
13+
use vortex_error::{VortexExpect, vortex_panic};
2014

2115
/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes,
2216
/// etc. that are available for use in a given context.
@@ -52,6 +46,26 @@ impl VortexSession {
5246
}
5347
self
5448
}
49+
50+
/// Inserts a new session variable of type `V` with the supplied value.
51+
///
52+
/// # Panics
53+
///
54+
/// If a variable of that type already exists.
55+
pub fn set<V: SessionVar>(self, val: V) -> Self {
56+
match self.0.entry(TypeId::of::<V>()) {
57+
Entry::Occupied(_) => {
58+
vortex_panic!(
59+
"Session variable of type {} already exists",
60+
type_name::<V>()
61+
);
62+
}
63+
Entry::Vacant(e) => {
64+
e.insert(Box::new(val));
65+
}
66+
}
67+
self
68+
}
5569
}
5670

5771
/// Trait for accessing and modifying the state of a Vortex session.
@@ -88,18 +102,6 @@ impl SessionExt for VortexSession {
88102

89103
/// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
90104
fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
91-
// NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
92-
// would immediately acquire an exclusive write lock.
93-
if let Some(v) = self.0.get(&TypeId::of::<V>()) {
94-
return Ref(v.map(|v| {
95-
(**v)
96-
.as_any()
97-
.downcast_ref::<V>()
98-
.vortex_expect("Type mismatch - this is a bug")
99-
}));
100-
}
101-
102-
// If we get here, the value was not present, so we insert the default with a write lock.
103105
Ref(self
104106
.0
105107
.entry(TypeId::of::<V>())

0 commit comments

Comments
 (0)