Skip to content

Commit a289c23

Browse files
authored
fix[gpu]: decode extension-typed columns on GPU (#8353)
Fixes #8143 Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 1082a5d commit a289c23

2 files changed

Lines changed: 99 additions & 3 deletions

File tree

vortex-cuda/src/executor.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ use vortex::array::ArrayVTable;
2121
use vortex::array::Canonical;
2222
use vortex::array::ExecutionCtx;
2323
use vortex::array::IntoArray;
24+
use vortex::array::arrays::Extension;
25+
use vortex::array::arrays::ExtensionArray;
2426
use vortex::array::arrays::Struct;
2527
use vortex::array::arrays::StructArray;
28+
use vortex::array::arrays::extension::ExtensionArrayExt;
2629
use vortex::array::arrays::struct_::StructDataParts;
2730
use vortex::array::buffer::BufferHandle;
2831
use vortex::dtype::PType;
@@ -412,6 +415,18 @@ impl CudaArrayExt for ArrayRef {
412415
)));
413416
}
414417

418+
// Extension arrays match AnyCanonical regardless of how their storage
419+
// is encoded, so the canonical early-return below would skip them with
420+
// the storage still compressed. Recurse into the storage so compressed
421+
// storage decodes on the GPU.
422+
if let Some(ext) = self.as_opt::<Extension>() {
423+
let storage = ext.storage_array().clone().execute_cuda(ctx).await?;
424+
return Ok(Canonical::Extension(ExtensionArray::new(
425+
ext.ext_dtype().clone(),
426+
storage.into_array(),
427+
)));
428+
}
429+
415430
if self.is_canonical() || self.is_empty() {
416431
trace!(encoding = ?self.encoding_id(), "skipping canonical");
417432
return self.execute(&mut ctx.ctx);

vortex-cuda/src/hybrid_dispatch/mod.rs

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,24 +151,47 @@ pub async fn try_gpu_dispatch(
151151

152152
#[cfg(test)]
153153
mod tests {
154+
use rstest::rstest;
155+
use vortex::array::ArrayRef;
154156
use vortex::array::IntoArray;
157+
use vortex::array::arrays::ExtensionArray;
155158
use vortex::array::arrays::PrimitiveArray;
159+
use vortex::array::arrays::extension::ExtensionArrayExt;
156160
use vortex::array::assert_arrays_eq;
161+
use vortex::array::extension::datetime::Date;
162+
use vortex::array::extension::datetime::TimeUnit;
163+
use vortex::array::extension::datetime::Timestamp;
157164
use vortex::array::validity::Validity::NonNullable;
158165
use vortex::buffer::Buffer;
166+
use vortex::dtype::NativePType;
167+
use vortex::dtype::Nullability;
159168
use vortex::encodings::fastlanes::BitPacked;
160169
use vortex::encodings::fastlanes::FoR;
161170
use vortex::error::VortexExpect;
162171
use vortex::error::VortexResult;
163172
use vortex::mask::Mask;
173+
use vortex::scalar::Scalar;
164174
use vortex::session::VortexSession;
165175
use vortex_array::LEGACY_SESSION;
166176
use vortex_array::VortexSessionExecute;
167177

168178
use crate::CanonicalCudaExt;
179+
use crate::canonicalize_cpu;
169180
use crate::executor::CudaArrayExt;
170181
use crate::session::CudaSession;
171182

183+
fn for_bp<T: NativePType + Into<Scalar>>(values: Vec<T>, reference: T) -> ArrayRef {
184+
let bp = BitPacked::encode(
185+
&PrimitiveArray::new(Buffer::from(values), NonNullable).into_array(),
186+
7,
187+
&mut LEGACY_SESSION.create_execution_ctx(),
188+
)
189+
.vortex_expect("bp");
190+
FoR::try_new(bp.into_array(), reference.into())
191+
.vortex_expect("for")
192+
.into_array()
193+
}
194+
172195
/// FoR(BitPacked) u32 — entire tree compiles into a single fused plan.
173196
#[crate::test]
174197
async fn test_fused() -> VortexResult<()> {
@@ -183,7 +206,7 @@ mod tests {
183206
.vortex_expect("bp");
184207
let arr = FoR::try_new(bp.into_array(), 1000u32.into()).vortex_expect("for");
185208

186-
let cpu = crate::canonicalize_cpu(arr.clone())?.into_array();
209+
let cpu = canonicalize_cpu(arr.clone())?.into_array();
187210
let gpu = arr
188211
.into_array()
189212
.execute_cuda(&mut ctx)
@@ -219,7 +242,7 @@ mod tests {
219242
None,
220243
)?;
221244

222-
let cpu = crate::canonicalize_cpu(alp.clone())?.into_array();
245+
let cpu = canonicalize_cpu(alp.clone())?.into_array();
223246
let gpu = alp
224247
.into_array()
225248
.execute_cuda(&mut ctx)
@@ -257,7 +280,7 @@ mod tests {
257280
.unwrap();
258281
let arr = ALP::try_new(encoded, Exponents { e: 0, f: 2 }, Some(patches))?;
259282

260-
let cpu = crate::canonicalize_cpu(arr.clone())?.into_array();
283+
let cpu = canonicalize_cpu(arr.clone())?.into_array();
261284
let gpu = arr
262285
.into_array()
263286
.execute_cuda(&mut ctx)
@@ -366,4 +389,62 @@ mod tests {
366389
assert_arrays_eq!(cpu, gpu);
367390
Ok(())
368391
}
392+
393+
/// Ext(FoR(BitPacked)) — extension-typed columns must decode their
394+
/// compressed storage on the GPU instead of being treated as already
395+
/// canonical (#8143).
396+
#[rstest]
397+
#[case::date(ExtensionArray::new(
398+
Date::new(TimeUnit::Days, Nullability::NonNullable).erased(),
399+
for_bp((0..2048i32).map(|i| i % 128).collect(), 10_000i32),
400+
))]
401+
#[case::timestamp(ExtensionArray::new(
402+
Timestamp::new(TimeUnit::Microseconds, Nullability::NonNullable).erased(),
403+
for_bp((0..2048i64).map(|i| i % 128).collect(), 1_000_000i64),
404+
))]
405+
#[crate::test]
406+
async fn test_ext_storage_gpu_decode(#[case] ext: ExtensionArray) -> VortexResult<()> {
407+
let mut ctx =
408+
CudaSession::create_execution_ctx(&VortexSession::empty()).vortex_expect("ctx");
409+
410+
let expected_storage = canonicalize_cpu(ext.storage_array().clone())?.into_array();
411+
let expected = ExtensionArray::new(ext.ext_dtype().clone(), expected_storage).into_array();
412+
413+
let actual = ext.into_array().execute_cuda(&mut ctx).await?;
414+
let storage = actual.as_extension().storage_array();
415+
assert!(
416+
storage.is_canonical(),
417+
"storage still encoded as {}",
418+
storage.encoding_id()
419+
);
420+
assert!(!storage.is_host(), "storage was not decoded on the device");
421+
422+
let actual = actual.into_host().await?.into_array();
423+
assert_arrays_eq!(expected, actual);
424+
Ok(())
425+
}
426+
427+
/// Extension over already-canonical storage executes unchanged.
428+
#[crate::test]
429+
async fn test_ext_canonical_storage() -> VortexResult<()> {
430+
let mut ctx =
431+
CudaSession::create_execution_ctx(&VortexSession::empty()).vortex_expect("ctx");
432+
433+
let ext = ExtensionArray::new(
434+
Date::new(TimeUnit::Days, Nullability::NonNullable).erased(),
435+
PrimitiveArray::new(Buffer::from((0..128i32).collect::<Vec<_>>()), NonNullable)
436+
.into_array(),
437+
);
438+
439+
let actual = ext
440+
.clone()
441+
.into_array()
442+
.execute_cuda(&mut ctx)
443+
.await?
444+
.into_host()
445+
.await?
446+
.into_array();
447+
assert_arrays_eq!(ext.into_array(), actual);
448+
Ok(())
449+
}
369450
}

0 commit comments

Comments
 (0)