Skip to content

Commit 06c84f6

Browse files
authored
Update FSST to store components of compressed codes (vortex-data#6325)
Fixes vortex-data#6293 --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 7ded351 commit 06c84f6

3 files changed

Lines changed: 214 additions & 53 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-

1+


encodings/fsst/src/array.rs

Lines changed: 200 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@ use vortex_array::builders::VarBinViewBuilder;
3131
use vortex_array::serde::ArrayChildren;
3232
use vortex_array::stats::ArrayStats;
3333
use vortex_array::stats::StatsSetRef;
34+
use vortex_array::validity::Validity;
3435
use vortex_array::vtable;
3536
use vortex_array::vtable::ArrayId;
3637
use vortex_array::vtable::BaseArrayVTable;
3738
use vortex_array::vtable::NotSupported;
3839
use vortex_array::vtable::VTable;
3940
use vortex_array::vtable::ValidityChild;
41+
use vortex_array::vtable::ValidityHelper;
4042
use vortex_array::vtable::ValidityVTableFromChild;
4143
use vortex_array::vtable::VisitorVTable;
4244
use vortex_buffer::Buffer;
45+
use vortex_buffer::ByteBuffer;
4346
use vortex_dtype::DType;
4447
use vortex_dtype::Nullability;
4548
use vortex_dtype::PType;
@@ -59,6 +62,9 @@ vtable!(FSST);
5962
pub struct FSSTMetadata {
6063
#[prost(enumeration = "PType", tag = "1")]
6164
uncompressed_lengths_ptype: i32,
65+
66+
#[prost(enumeration = "PType", tag = "2")]
67+
codes_offsets_ptype: i32,
6268
}
6369

6470
impl FSSTMetadata {
@@ -85,8 +91,8 @@ impl VTable for FSSTVTable {
8591

8692
fn metadata(array: &FSSTArray) -> VortexResult<Self::Metadata> {
8793
Ok(ProstMetadata(FSSTMetadata {
88-
uncompressed_lengths_ptype: PType::try_from(array.uncompressed_lengths().dtype())?
89-
as i32,
94+
uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
95+
codes_offsets_ptype: array.codes.offsets().dtype().as_ptype().into(),
9096
}))
9197
}
9298

@@ -100,48 +106,118 @@ impl VTable for FSSTVTable {
100106
))
101107
}
102108

109+
fn append_to_builder(
110+
array: &FSSTArray,
111+
builder: &mut dyn ArrayBuilder,
112+
ctx: &mut ExecutionCtx,
113+
) -> VortexResult<()> {
114+
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
115+
builder.extend_from_array(&array.to_array().execute::<Canonical>(ctx)?.into_array());
116+
return Ok(());
117+
};
118+
119+
// Decompress the whole block of data into a new buffer, and create some views
120+
// from it instead.
121+
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
122+
123+
builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()?);
124+
Ok(())
125+
}
126+
103127
fn build(
104128
dtype: &DType,
105129
len: usize,
106130
metadata: &Self::Metadata,
107131
buffers: &[BufferHandle],
108132
children: &dyn ArrayChildren,
109133
) -> VortexResult<FSSTArray> {
110-
if buffers.len() != 2 {
111-
vortex_bail!(InvalidArgument: "Expected 2 buffers, got {}", buffers.len());
112-
}
113134
let symbols = Buffer::<Symbol>::from_byte_buffer(buffers[0].clone().try_to_host_sync()?);
114135
let symbol_lengths = Buffer::<u8>::from_byte_buffer(buffers[1].clone().try_to_host_sync()?);
115136

116-
if children.len() != 2 {
117-
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
137+
// Check for the legacy deserialization path.
138+
if buffers.len() == 2 {
139+
if children.len() != 2 {
140+
vortex_bail!(InvalidArgument: "Expected 2 children, got {}", children.len());
141+
}
142+
let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
143+
let codes = codes
144+
.as_opt::<VarBinVTable>()
145+
.ok_or_else(|| {
146+
vortex_err!(
147+
"Expected VarBinArray for codes, got {}",
148+
codes.encoding_id()
149+
)
150+
})?
151+
.clone();
152+
let uncompressed_lengths = children.get(
153+
1,
154+
&DType::Primitive(
155+
metadata.0.get_uncompressed_lengths_ptype()?,
156+
Nullability::NonNullable,
157+
),
158+
len,
159+
)?;
160+
161+
return FSSTArray::try_new(
162+
dtype.clone(),
163+
symbols,
164+
symbol_lengths,
165+
codes,
166+
uncompressed_lengths,
167+
);
118168
}
119-
let codes = children.get(0, &DType::Binary(dtype.nullability()), len)?;
120-
let codes = codes
121-
.as_opt::<VarBinVTable>()
122-
.ok_or_else(|| {
123-
vortex_err!(
124-
"Expected VarBinArray for codes, got {}",
125-
codes.encoding_id()
126-
)
127-
})?
128-
.clone();
129-
let uncompressed_lengths = children.get(
130-
1,
131-
&DType::Primitive(
132-
metadata.0.get_uncompressed_lengths_ptype()?,
133-
Nullability::NonNullable,
134-
),
135-
len,
136-
)?;
137-
138-
FSSTArray::try_new(
139-
dtype.clone(),
140-
symbols,
141-
symbol_lengths,
142-
codes,
143-
uncompressed_lengths,
144-
)
169+
170+
// Check for the current deserialization path.
171+
if buffers.len() == 3 {
172+
let uncompressed_lengths = children.get(
173+
0,
174+
&DType::Primitive(
175+
metadata.0.get_uncompressed_lengths_ptype()?,
176+
Nullability::NonNullable,
177+
),
178+
len,
179+
)?;
180+
181+
let codes_buffer = ByteBuffer::from_byte_buffer(buffers[2].clone().try_to_host_sync()?);
182+
let codes_offsets = children.get(
183+
1,
184+
&DType::Primitive(
185+
PType::try_from(metadata.codes_offsets_ptype)?,
186+
Nullability::NonNullable,
187+
),
188+
// VarBin offsets are len + 1
189+
len + 1,
190+
)?;
191+
192+
let codes_validity = if children.len() == 2 {
193+
Validity::from(dtype.nullability())
194+
} else if children.len() == 3 {
195+
let validity = children.get(2, &Validity::DTYPE, len)?;
196+
Validity::Array(validity)
197+
} else {
198+
vortex_bail!("Expected 0 or 1 child, got {}", children.len());
199+
};
200+
201+
let codes = VarBinArray::try_new(
202+
codes_offsets,
203+
codes_buffer,
204+
DType::Binary(dtype.nullability()),
205+
codes_validity,
206+
)?;
207+
208+
return FSSTArray::try_new(
209+
dtype.clone(),
210+
symbols,
211+
symbol_lengths,
212+
codes,
213+
uncompressed_lengths,
214+
);
215+
}
216+
217+
vortex_bail!(
218+
"InvalidArgument: Expected 2 or 3 buffers, got {}",
219+
buffers.len()
220+
);
145221
}
146222

147223
fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
@@ -175,24 +251,6 @@ impl VTable for FSSTVTable {
175251
Ok(())
176252
}
177253

178-
fn append_to_builder(
179-
array: &FSSTArray,
180-
builder: &mut dyn ArrayBuilder,
181-
ctx: &mut ExecutionCtx,
182-
) -> VortexResult<()> {
183-
let Some(builder) = builder.as_any_mut().downcast_mut::<VarBinViewBuilder>() else {
184-
builder.extend_from_array(&array.to_array().execute::<Canonical>(ctx)?.into_array());
185-
return Ok(());
186-
};
187-
188-
// Decompress the whole block of data into a new buffer, and create some views
189-
// from it instead.
190-
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
191-
192-
builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()?);
193-
Ok(())
194-
}
195-
196254
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
197255
canonicalize_fsst(array, ctx)
198256
}
@@ -423,21 +481,39 @@ impl VisitorVTable<FSSTVTable> for FSSTVTable {
423481
"symbol_lengths",
424482
&BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
425483
);
484+
visitor.visit_buffer_handle("compressed_codes", array.codes.bytes_handle())
426485
}
427486

428487
fn visit_children(array: &FSSTArray, visitor: &mut dyn ArrayChildVisitor) {
429-
visitor.visit_child("codes", &array.codes().to_array());
430488
visitor.visit_child("uncompressed_lengths", array.uncompressed_lengths());
489+
visitor.visit_child("codes_offsets", array.codes.offsets());
490+
visitor.visit_validity(array.codes.validity(), array.codes.len());
431491
}
432492
}
433493

434494
#[cfg(test)]
435495
mod test {
496+
use fsst::Compressor;
497+
use fsst::Symbol;
498+
use vortex_array::Array;
499+
use vortex_array::IntoArray;
500+
use vortex_array::LEGACY_SESSION;
436501
use vortex_array::ProstMetadata;
502+
use vortex_array::VortexSessionExecute;
503+
use vortex_array::accessor::ArrayAccessor;
504+
use vortex_array::arrays::VarBinViewArray;
505+
use vortex_array::buffer::BufferHandle;
437506
use vortex_array::test_harness::check_metadata;
507+
use vortex_array::vtable::VTable;
508+
use vortex_buffer::Buffer;
509+
use vortex_dtype::DType;
510+
use vortex_dtype::Nullability;
438511
use vortex_dtype::PType;
512+
use vortex_error::VortexError;
439513

514+
use crate::FSSTVTable;
440515
use crate::array::FSSTMetadata;
516+
use crate::fsst_compress_iter;
441517

442518
#[cfg_attr(miri, ignore)]
443519
#[test]
@@ -446,7 +522,79 @@ mod test {
446522
"fsst.metadata",
447523
ProstMetadata(FSSTMetadata {
448524
uncompressed_lengths_ptype: PType::U64 as i32,
525+
codes_offsets_ptype: PType::I32 as i32,
449526
}),
450527
);
451528
}
529+
530+
/// The original FSST array stored codes as a VarBinArray child and required that the child
531+
/// have this encoding. Vortex forbids this kind of introspection, therefore we had to fix
532+
/// the array to store the compressed offsets and compressed data buffer separately, and only
533+
/// use VarBinArray to delegate behavior.
534+
///
535+
/// This test manually constructs an old-style FSST array and ensures that it can still be
536+
/// deserialized.
537+
#[test]
538+
fn test_back_compat() {
539+
let symbols = Buffer::<Symbol>::copy_from([
540+
Symbol::from_slice(b"abc00000"),
541+
Symbol::from_slice(b"defghijk"),
542+
]);
543+
let symbol_lengths = Buffer::<u8>::copy_from([3, 8]);
544+
545+
let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice());
546+
let fsst_array = fsst_compress_iter(
547+
[Some(b"abcabcab".as_ref()), Some(b"defghijk".as_ref())].into_iter(),
548+
2,
549+
DType::Utf8(Nullability::NonNullable),
550+
&compressor,
551+
);
552+
553+
let compressed_codes = fsst_array.codes().clone();
554+
555+
// There were two buffers:
556+
// 1. The 8 byte symbols
557+
// 2. The symbol lengths as u8.
558+
let buffers = [
559+
BufferHandle::new_host(symbols.into_byte_buffer()),
560+
BufferHandle::new_host(symbol_lengths.into_byte_buffer()),
561+
];
562+
563+
// There were 2 children:
564+
// 1. The compressed codes, stored as a VarBinArray.
565+
// 2. The uncompressed lengths, stored as a Primitive array.
566+
let children = vec![
567+
compressed_codes.into_array(),
568+
fsst_array.uncompressed_lengths().clone(),
569+
];
570+
571+
let fsst = FSSTVTable::build(
572+
&DType::Utf8(Nullability::NonNullable),
573+
2,
574+
&ProstMetadata(FSSTMetadata {
575+
uncompressed_lengths_ptype: fsst_array
576+
.uncompressed_lengths()
577+
.dtype()
578+
.as_ptype()
579+
.into(),
580+
// Legacy array did not store this field, use Protobuf default of 0.
581+
codes_offsets_ptype: 0,
582+
}),
583+
&buffers,
584+
&children.as_slice(),
585+
)
586+
.unwrap();
587+
588+
let decompressed = fsst
589+
.into_array()
590+
.execute::<VarBinViewArray>(&mut LEGACY_SESSION.create_execution_ctx())
591+
.unwrap();
592+
decompressed
593+
.with_iterator(|it| {
594+
assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref()));
595+
assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref()));
596+
Ok::<_, VortexError>(())
597+
})
598+
.unwrap()
599+
}
452600
}

vortex-array/src/serde.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,19 @@ pub trait ArrayChildren {
269269
}
270270
}
271271

272+
impl ArrayChildren for &[ArrayRef] {
273+
fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274+
let array = self[index].clone();
275+
assert_eq!(array.len(), len);
276+
assert_eq!(array.dtype(), dtype);
277+
Ok(array)
278+
}
279+
280+
fn len(&self) -> usize {
281+
<[_]>::len(self)
282+
}
283+
}
284+
272285
/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
273286
/// It contains all the information from the serialized form, without anything extra. i.e.
274287
/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete

0 commit comments

Comments
 (0)