Skip to content

Commit 0cb0746

Browse files
authored
feat: Hold string cache in new streaming engine and fix row-encoding (#21039)
1 parent 35c71b2 commit 0cb0746

File tree

33 files changed

+136
-75
lines changed

33 files changed

+136
-75
lines changed

crates/polars-core/src/chunked_array/ops/row_encode.rs

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
7474
///
7575
/// This should be given the logical type in order to communicate Polars datatype information down
7676
/// into the row encoding / decoding.
77-
pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext> {
77+
pub fn get_row_encoding_context(dtype: &DataType, ordered: bool) -> Option<RowEncodingContext> {
7878
match dtype {
7979
DataType::Boolean
8080
| DataType::UInt8
@@ -108,67 +108,86 @@ pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext>
108108
},
109109

110110
#[cfg(feature = "dtype-array")]
111-
DataType::Array(dtype, _) => get_row_encoding_context(dtype),
112-
DataType::List(dtype) => get_row_encoding_context(dtype),
111+
DataType::Array(dtype, _) => get_row_encoding_context(dtype, ordered),
112+
DataType::List(dtype) => get_row_encoding_context(dtype, ordered),
113113
#[cfg(feature = "dtype-categorical")]
114114
DataType::Categorical(revmap, ordering) | DataType::Enum(revmap, ordering) => {
115-
let revmap = revmap.as_ref().unwrap();
116-
117-
let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
118-
RevMapping::Global(map, _, _) => {
119-
let num_known_categories = map.keys().max().copied().map_or(0, |m| m + 1);
120-
121-
// @TODO: This should probably be cached.
122-
let lexical_sort_idxs =
123-
matches!(ordering, CategoricalOrdering::Lexical).then(|| {
124-
let read_map = crate::STRING_CACHE.read_map();
125-
let payloads = read_map.get_current_payloads();
126-
assert!(payloads.len() >= num_known_categories as usize);
127-
128-
let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
129-
idxs.sort_by_key(|&k| payloads[k as usize].as_str());
130-
let mut sort_idxs = vec![0; num_known_categories as usize];
131-
for (i, idx) in idxs.into_iter().enumerate_u32() {
132-
sort_idxs[idx as usize] = i;
133-
}
134-
sort_idxs
135-
});
136-
137-
(num_known_categories, lexical_sort_idxs)
115+
let is_enum = dtype.is_enum();
116+
let ctx = match revmap {
117+
Some(revmap) => {
118+
let (num_known_categories, lexical_sort_idxs) = match revmap.as_ref() {
119+
RevMapping::Global(map, _, _) => {
120+
let num_known_categories =
121+
map.keys().max().copied().map_or(0, |m| m + 1);
122+
123+
// @TODO: This should probably be cached.
124+
let lexical_sort_idxs = (ordered
125+
&& matches!(ordering, CategoricalOrdering::Lexical))
126+
.then(|| {
127+
let read_map = crate::STRING_CACHE.read_map();
128+
let payloads = read_map.get_current_payloads();
129+
assert!(payloads.len() >= num_known_categories as usize);
130+
131+
let mut idxs = (0..num_known_categories).collect::<Vec<u32>>();
132+
idxs.sort_by_key(|&k| payloads[k as usize].as_str());
133+
let mut sort_idxs = vec![0; num_known_categories as usize];
134+
for (i, idx) in idxs.into_iter().enumerate_u32() {
135+
sort_idxs[idx as usize] = i;
136+
}
137+
sort_idxs
138+
});
139+
140+
(num_known_categories, lexical_sort_idxs)
141+
},
142+
RevMapping::Local(values, _) => {
143+
// @TODO: This should probably be cached.
144+
let lexical_sort_idxs = (ordered
145+
&& matches!(ordering, CategoricalOrdering::Lexical))
146+
.then(|| {
147+
assert_eq!(values.null_count(), 0);
148+
let values: Vec<&str> = values.values_iter().collect();
149+
150+
let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
151+
idxs.sort_by_key(|&k| values[k as usize]);
152+
let mut sort_idxs = vec![0; values.len()];
153+
for (i, idx) in idxs.into_iter().enumerate_u32() {
154+
sort_idxs[idx as usize] = i;
155+
}
156+
sort_idxs
157+
});
158+
159+
(values.len() as u32, lexical_sort_idxs)
160+
},
161+
};
162+
163+
RowEncodingCategoricalContext {
164+
num_known_categories,
165+
is_enum,
166+
lexical_sort_idxs,
167+
}
138168
},
139-
RevMapping::Local(values, _) => {
140-
// @TODO: This should probably be cached.
141-
let lexical_sort_idxs =
142-
matches!(ordering, CategoricalOrdering::Lexical).then(|| {
143-
assert_eq!(values.null_count(), 0);
144-
let values: Vec<&str> = values.values_iter().collect();
145-
146-
let mut idxs = (0..values.len() as u32).collect::<Vec<u32>>();
147-
idxs.sort_by_key(|&k| values[k as usize]);
148-
let mut sort_idxs = vec![0; values.len()];
149-
for (i, idx) in idxs.into_iter().enumerate_u32() {
150-
sort_idxs[idx as usize] = i;
151-
}
152-
sort_idxs
153-
});
154-
155-
(values.len() as u32, lexical_sort_idxs)
169+
None => {
170+
let num_known_categories = u32::MAX;
171+
172+
if matches!(ordering, CategoricalOrdering::Lexical) && ordered {
173+
panic!("lexical ordering not yet supported if rev-map not given");
174+
}
175+
RowEncodingCategoricalContext {
176+
num_known_categories,
177+
is_enum,
178+
lexical_sort_idxs: None,
179+
}
156180
},
157181
};
158182

159-
let ctx = RowEncodingCategoricalContext {
160-
num_known_categories,
161-
is_enum: matches!(dtype, DataType::Enum(_, _)),
162-
lexical_sort_idxs,
163-
};
164183
Some(RowEncodingContext::Categorical(ctx))
165184
},
166185
#[cfg(feature = "dtype-struct")]
167186
DataType::Struct(fs) => {
168187
let mut ctxts = Vec::new();
169188

170189
for (i, f) in fs.iter().enumerate() {
171-
if let Some(ctxt) = get_row_encoding_context(f.dtype()) {
190+
if let Some(ctxt) = get_row_encoding_context(f.dtype(), ordered) {
172191
ctxts.reserve(fs.len());
173192
ctxts.extend(std::iter::repeat_n(None, i));
174193
ctxts.push(Some(ctxt));
@@ -183,7 +202,7 @@ pub fn get_row_encoding_context(dtype: &DataType) -> Option<RowEncodingContext>
183202
ctxts.extend(
184203
fs[ctxts.len()..]
185204
.iter()
186-
.map(|f| get_row_encoding_context(f.dtype())),
205+
.map(|f| get_row_encoding_context(f.dtype(), ordered)),
187206
);
188207

189208
Some(RowEncodingContext::Struct(ctxts))
@@ -214,7 +233,7 @@ pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
214233
let by = by.as_materialized_series();
215234
let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
216235
let opt = RowEncodingOptions::new_unsorted();
217-
let ctxt = get_row_encoding_context(by.dtype());
236+
let ctxt = get_row_encoding_context(by.dtype(), false);
218237

219238
cols.push(arr);
220239
opts.push(opt);
@@ -245,7 +264,7 @@ pub fn _get_rows_encoded(
245264
let by = by.as_materialized_series();
246265
let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
247266
let opt = RowEncodingOptions::new_sorted(*desc, *null_last);
248-
let ctxt = get_row_encoding_context(by.dtype());
267+
let ctxt = get_row_encoding_context(by.dtype(), true);
249268

250269
cols.push(arr);
251270
opts.push(opt);

crates/polars-core/src/series/mod.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -509,15 +509,26 @@ impl Series {
509509
},
510510

511511
#[cfg(feature = "dtype-categorical")]
512-
(D::UInt32, D::Categorical(revmap, ordering)) => Ok(unsafe {
513-
CategoricalChunked::from_cats_and_rev_map_unchecked(
514-
self.u32().unwrap().clone(),
515-
revmap.as_ref().unwrap().clone(),
516-
false,
517-
*ordering,
518-
)
519-
}
520-
.into_series()),
512+
(D::UInt32, D::Categorical(revmap, ordering)) => match revmap {
513+
Some(revmap) => Ok(unsafe {
514+
CategoricalChunked::from_cats_and_rev_map_unchecked(
515+
self.u32().unwrap().clone(),
516+
revmap.clone(),
517+
false,
518+
*ordering,
519+
)
520+
}
521+
.into_series()),
522+
// In the streaming engine this is `None` and the global string cache is turned on
523+
// for the duration of the query.
524+
None => Ok(unsafe {
525+
CategoricalChunked::from_global_indices_unchecked(
526+
self.u32().unwrap().clone(),
527+
*ordering,
528+
)
529+
.into_series()
530+
}),
531+
},
521532
#[cfg(feature = "dtype-categorical")]
522533
(D::UInt32, D::Enum(revmap, ordering)) => Ok(unsafe {
523534
CategoricalChunked::from_cats_and_rev_map_unchecked(

crates/polars-expr/src/groups/row_encoded.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl RowEncodedHashGrouper {
4242
let ctxts = self
4343
.key_schema
4444
.iter()
45-
.map(|(_, dt)| get_row_encoding_context(dt))
45+
.map(|(_, dt)| get_row_encoding_context(dt, false))
4646
.collect::<Vec<_>>();
4747
let fields = vec![RowEncodingOptions::new_unsorted(); key_dtypes.len()];
4848
let key_columns =

crates/polars-lazy/src/frame/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,7 @@ impl LazyFrame {
868868
payload,
869869
});
870870

871+
let _hold = StringCacheHolder::hold();
871872
let f = || {
872873
polars_stream::run_query(stream_lp_top, alp_plan.lp_arena, &mut alp_plan.expr_arena)
873874
};

crates/polars-pipe/src/executors/sinks/group_by/generic/eval.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl Eval {
7777
let mut dicts = Vec::with_capacity(self.key_columns_expr.len());
7878
for phys_e in self.key_columns_expr.iter() {
7979
let s = phys_e.evaluate(chunk, &context.execution_state)?;
80-
dicts.push(get_row_encoding_context(s.dtype()));
80+
dicts.push(get_row_encoding_context(s.dtype(), false));
8181
let s = s.to_physical_repr().into_owned();
8282
let s = prepare_key(&s, chunk);
8383
keys_columns.push(s.to_arrow(0, CompatLevel::newest()));

crates/polars-pipe/src/executors/sinks/group_by/generic/hash_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl<const FIXED: bool> AggHashTable<FIXED> {
262262
.output_schema
263263
.iter_values()
264264
.take(self.num_keys)
265-
.map(get_row_encoding_context)
265+
.map(|dt| get_row_encoding_context(dt, false))
266266
.collect::<Vec<_>>();
267267
let fields = vec![Default::default(); self.num_keys];
268268
let key_columns =

crates/polars-pipe/src/executors/sinks/joins/generic_build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl<K: ExtraPayload> GenericBuild<K> {
142142
let s = phys_e.evaluate(chunk, &context.execution_state)?;
143143
let arr = s.to_physical_repr().rechunk().array_ref(0).clone();
144144
self.join_columns.push(arr);
145-
ctxts.push(get_row_encoding_context(s.dtype()));
145+
ctxts.push(get_row_encoding_context(s.dtype(), false));
146146
}
147147
let rows_encoded = polars_row::convert_columns_no_order(
148148
self.join_columns[0].len(), // @NOTE: does not work for ZFS

crates/polars-pipe/src/executors/sinks/joins/row_values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl RowValues {
6060
names.push(s.name().to_string());
6161
}
6262
self.join_columns_material.push(s.array_ref(0).clone());
63-
ctxts.push(get_row_encoding_context(s.dtype()));
63+
ctxts.push(get_row_encoding_context(s.dtype(), false));
6464
}
6565

6666
// We determine the indices of the columns that have to be removed

crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl SortSinkMultiple {
142142
.iter()
143143
.map(|i| {
144144
let (_, dtype) = schema.get_at_index(*i).unwrap();
145-
get_row_encoding_context(dtype)
145+
get_row_encoding_context(dtype, true)
146146
})
147147
.collect::<Vec<_>>();
148148

crates/polars-python/src/series/general.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,9 @@ impl PySeries {
484484

485485
let dicts = dtypes
486486
.iter()
487-
.map(|(_, dtype)| get_row_encoding_context(&dtype.0))
487+
.map(|(_, dt)| dt)
488+
.zip(opts.iter())
489+
.map(|(dtype, opts)| get_row_encoding_context(&dtype.0, opts.is_ordered()))
488490
.collect::<Vec<_>>();
489491

490492
// Get the BinaryOffset array.

0 commit comments

Comments
 (0)