Skip to content

Commit 54ddc8c

Browse files
authored
Relax SorbetColumnDescriptors and add ChunkColumnDescriptors (#9934)
### Related * Closes #9855 ### What `SorbetColumnDescriptors` no longer enforces the order of columns. Instead there is a new `ChunkColumnDescriptors` that enforces this.
1 parent 19a43ed commit 54ddc8c

File tree

16 files changed

+360
-351
lines changed

16 files changed

+360
-351
lines changed

crates/store/re_chunk_store/src/dataframe.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use itertools::Itertools as _;
1414
use re_chunk::{LatestAtQuery, RangeQuery, TimelineName};
1515
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline};
1616
use re_sorbet::{
17-
ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector,
18-
IndexColumnDescriptor, SorbetColumnDescriptors, TimeColumnSelector,
17+
ChunkColumnDescriptors, ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor,
18+
ComponentColumnSelector, IndexColumnDescriptor, TimeColumnSelector,
1919
};
2020
use re_types_core::{ComponentDescriptor, ComponentName};
2121
use tap::Tap as _;
@@ -306,7 +306,7 @@ impl ChunkStore {
306306
/// The order of the columns is guaranteed to be in a specific order:
307307
/// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
308308
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
309-
pub fn schema(&self) -> SorbetColumnDescriptors {
309+
pub fn schema(&self) -> ChunkColumnDescriptors {
310310
re_tracing::profile_function!();
311311

312312
let indices = self
@@ -360,8 +360,8 @@ impl ChunkStore {
360360
.collect_vec()
361361
.tap_mut(|components| components.sort());
362362

363-
SorbetColumnDescriptors {
364-
row_id: Some(self.row_id_descriptor()),
363+
ChunkColumnDescriptors {
364+
row_id: self.row_id_descriptor(),
365365
indices,
366366
components,
367367
}
@@ -483,7 +483,7 @@ impl ChunkStore {
483483
/// The order of the columns is guaranteed to be in a specific order:
484484
/// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
485485
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
486-
pub fn schema_for_query(&self, query: &QueryExpression) -> SorbetColumnDescriptors {
486+
pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
487487
re_tracing::profile_function!();
488488

489489
let QueryExpression {

crates/store/re_dataframe/src/engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use re_chunk::EntityPath;
44
use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle, QueryExpression};
55
use re_log_types::{EntityPathFilter, StoreId};
66
use re_query::{QueryCache, QueryCacheHandle, StorageEngine, StorageEngineLike};
7-
use re_sorbet::SorbetColumnDescriptors;
7+
use re_sorbet::ChunkColumnDescriptors;
88

99
use crate::QueryHandle;
1010

@@ -69,7 +69,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
6969
/// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
7070
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
7171
#[inline]
72-
pub fn schema(&self) -> SorbetColumnDescriptors {
72+
pub fn schema(&self) -> ChunkColumnDescriptors {
7373
self.engine.with(|store, _cache| store.schema())
7474
}
7575

@@ -79,7 +79,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
7979
/// * first, the time columns in lexical order (`frame_nr`, `log_time`, ...);
8080
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
8181
#[inline]
82-
pub fn schema_for_query(&self, query: &QueryExpression) -> SorbetColumnDescriptors {
82+
pub fn schema_for_query(&self, query: &QueryExpression) -> ChunkColumnDescriptors {
8383
self.engine
8484
.with(|store, _cache| store.schema_for_query(query))
8585
}

crates/store/re_dataframe/src/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use re_chunk_store::{
3232
use re_log_types::ResolvedTimeRange;
3333
use re_query::{QueryCache, StorageEngineLike};
3434
use re_sorbet::{
35-
ColumnSelector, ComponentColumnSelector, RowIdColumnDescriptor, SorbetColumnDescriptors,
35+
ChunkColumnDescriptors, ColumnSelector, ComponentColumnSelector, RowIdColumnDescriptor,
3636
TimeColumnSelector,
3737
};
3838
use re_types_core::{archetypes, arrow_helpers::as_array_ref, ComponentDescriptor, Loggable as _};
@@ -79,7 +79,7 @@ struct QueryHandleState {
7979
/// Describes the columns that make up this view.
8080
///
8181
/// See [`QueryExpression::view_contents`].
82-
view_contents: SorbetColumnDescriptors,
82+
view_contents: ChunkColumnDescriptors,
8383

8484
/// Describes the columns specifically selected to be returned from this view.
8585
///
@@ -683,7 +683,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
683683
///
684684
/// See [`QueryExpression::view_contents`].
685685
#[inline]
686-
pub fn view_contents(&self) -> &SorbetColumnDescriptors {
686+
pub fn view_contents(&self) -> &ChunkColumnDescriptors {
687687
&self.init().view_contents
688688
}
689689

crates/store/re_sorbet/src/chunk_batch.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,17 @@ impl From<&ChunkBatch> for ArrowRecordBatch {
138138
impl TryFrom<&ArrowRecordBatch> for ChunkBatch {
139139
type Error = SorbetError;
140140

141-
/// Will automatically wrap data columns in `ListArrays` if they are not already.
141+
/// Will perform some transformations:
142+
/// * Will automatically wrap data columns in `ListArrays` if they are not already
143+
/// * Will reorder columns so that Row ID comes before timelines, which come before data
144+
/// * Will migrate legacy data to more modern form
142145
fn try_from(batch: &ArrowRecordBatch) -> Result<Self, Self::Error> {
143146
re_tracing::profile_function!();
144147

148+
let batch = crate::migration::reorder_columns(batch);
149+
145150
Self::try_from(SorbetBatch::try_from_record_batch(
146-
batch,
151+
&batch,
147152
crate::BatchType::Chunk,
148153
)?)
149154
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use arrow::datatypes::Fields as ArrowFields;
2+
3+
use re_log_types::EntityPath;
4+
5+
use crate::{
6+
ColumnDescriptor, ComponentColumnDescriptor, IndexColumnDescriptor, RowIdColumnDescriptor,
7+
SorbetColumnDescriptors, SorbetError,
8+
};
9+
10+
/// Requires a specific ordering of the columns.
11+
#[derive(Debug, Clone, PartialEq, Eq)]
12+
pub struct ChunkColumnDescriptors {
13+
/// The primary row id column.
14+
pub row_id: RowIdColumnDescriptor,
15+
16+
/// Index columns (timelines).
17+
pub indices: Vec<IndexColumnDescriptor>,
18+
19+
/// The actual component data
20+
pub components: Vec<ComponentColumnDescriptor>,
21+
}
22+
23+
impl ChunkColumnDescriptors {
24+
/// Debug-only sanity check.
25+
#[inline]
26+
#[track_caller]
27+
pub fn sanity_check(&self) {
28+
for component in &self.components {
29+
component.sanity_check();
30+
}
31+
}
32+
33+
/// Returns all indices and then all components;
34+
/// skipping the `row_id` column.
35+
///
36+
/// See also [`Self::get_index_or_component`].
37+
// TODO(#9922): stop ignoring row_id
38+
pub fn indices_and_components(&self) -> Vec<ColumnDescriptor> {
39+
itertools::chain!(
40+
self.indices.iter().cloned().map(ColumnDescriptor::Time),
41+
self.components
42+
.iter()
43+
.cloned()
44+
.map(ColumnDescriptor::Component),
45+
)
46+
.collect()
47+
}
48+
49+
/// Index the index- and component columns, ignoring the `row_id` column completely.
50+
///
51+
/// That is, `get_index_or_component(0)` will return the first index column (if any; otherwise
52+
/// the first component column).
53+
///
54+
/// See also [`Self::indices_and_components`].
55+
// TODO(#9922): stop ignoring row_id
56+
pub fn get_index_or_component(&self, index_ignoring_row_id: usize) -> Option<ColumnDescriptor> {
57+
if index_ignoring_row_id < self.indices.len() {
58+
Some(ColumnDescriptor::Time(
59+
self.indices[index_ignoring_row_id].clone(),
60+
))
61+
} else {
62+
self.components
63+
.get(index_ignoring_row_id - self.indices.len())
64+
.cloned()
65+
.map(ColumnDescriptor::Component)
66+
}
67+
}
68+
69+
/// Keep only the component columns that satisfy the given predicate.
70+
#[must_use]
71+
#[inline]
72+
pub fn filter_components(mut self, keep: impl Fn(&ComponentColumnDescriptor) -> bool) -> Self {
73+
self.components.retain(keep);
74+
self
75+
}
76+
}
77+
78+
impl ChunkColumnDescriptors {
79+
pub fn try_from_arrow_fields(
80+
chunk_entity_path: Option<&EntityPath>,
81+
fields: &ArrowFields,
82+
) -> Result<Self, SorbetError> {
83+
Self::try_from(SorbetColumnDescriptors::try_from_arrow_fields(
84+
chunk_entity_path,
85+
fields,
86+
)?)
87+
}
88+
}
89+
90+
impl TryFrom<SorbetColumnDescriptors> for ChunkColumnDescriptors {
91+
type Error = SorbetError;
92+
93+
fn try_from(columns: SorbetColumnDescriptors) -> Result<Self, Self::Error> {
94+
let SorbetColumnDescriptors { columns } = columns;
95+
96+
let mut row_ids = Vec::new();
97+
let mut indices = Vec::new();
98+
let mut components = Vec::new();
99+
100+
for column in &columns {
101+
match column.clone() {
102+
ColumnDescriptor::RowId(descr) => {
103+
if indices.is_empty() && components.is_empty() {
104+
row_ids.push(descr);
105+
} else {
106+
let err = format!(
107+
"RowId column must be the first column; but the columns were: {columns:?}"
108+
);
109+
return Err(SorbetError::InvalidColumnOrder(err));
110+
}
111+
}
112+
113+
ColumnDescriptor::Time(descr) => {
114+
if components.is_empty() {
115+
indices.push(descr);
116+
} else {
117+
return Err(SorbetError::InvalidColumnOrder(
118+
"Index columns must come before any data columns".to_owned(),
119+
));
120+
}
121+
}
122+
123+
ColumnDescriptor::Component(descr) => {
124+
components.push(descr);
125+
}
126+
}
127+
}
128+
129+
if row_ids.len() > 1 {
130+
return Err(SorbetError::MultipleRowIdColumns(row_ids.len()));
131+
}
132+
133+
let row_id = row_ids.pop().ok_or(SorbetError::MissingRowIdColumn)?;
134+
135+
Ok(Self {
136+
row_id,
137+
indices,
138+
components,
139+
})
140+
}
141+
}
142+
143+
impl From<ChunkColumnDescriptors> for SorbetColumnDescriptors {
144+
fn from(columns: ChunkColumnDescriptors) -> Self {
145+
let ChunkColumnDescriptors {
146+
row_id,
147+
indices,
148+
components,
149+
} = columns;
150+
151+
let columns = itertools::chain!(
152+
std::iter::once(ColumnDescriptor::RowId(row_id.clone())),
153+
indices.iter().cloned().map(ColumnDescriptor::Time),
154+
components.iter().cloned().map(ColumnDescriptor::Component),
155+
)
156+
.collect();
157+
158+
Self { columns }
159+
}
160+
}

crates/store/re_sorbet/src/chunk_schema.rs

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use re_log_types::EntityPath;
66
use re_types_core::ChunkId;
77

88
use crate::{
9-
ArrowBatchMetadata, ComponentColumnDescriptor, IndexColumnDescriptor, RowIdColumnDescriptor,
9+
chunk_columns::ChunkColumnDescriptors, ArrowBatchMetadata, ColumnDescriptor,
10+
ComponentColumnDescriptor, IndexColumnDescriptor, RowIdColumnDescriptor,
1011
SorbetColumnDescriptors, SorbetError, SorbetSchema,
1112
};
1213

@@ -19,8 +20,8 @@ pub struct ChunkSchema {
1920
sorbet: SorbetSchema,
2021

2122
// Some things here are also in [`SorbetSchema]`, but are duplicated
22-
// here because they are non-optional:
23-
row_id: RowIdColumnDescriptor,
23+
// here because they have additional constraints (e.g. ordering, non-optional):
24+
chunk_columns: ChunkColumnDescriptors,
2425
chunk_id: ChunkId,
2526
entity_path: EntityPath,
2627
}
@@ -53,15 +54,22 @@ impl ChunkSchema {
5354
Self {
5455
sorbet: SorbetSchema {
5556
columns: SorbetColumnDescriptors {
56-
row_id: Some(row_id.clone()),
57-
indices,
58-
components,
57+
columns: itertools::chain!(
58+
std::iter::once(ColumnDescriptor::RowId(row_id.clone())),
59+
indices.iter().cloned().map(ColumnDescriptor::Time),
60+
components.iter().cloned().map(ColumnDescriptor::Component),
61+
)
62+
.collect(),
5963
},
6064
chunk_id: Some(chunk_id),
6165
entity_path: Some(entity_path.clone()),
6266
heap_size_bytes: None,
6367
},
64-
row_id,
68+
chunk_columns: ChunkColumnDescriptors {
69+
row_id,
70+
indices,
71+
components,
72+
},
6573
chunk_id,
6674
entity_path,
6775
}
@@ -103,17 +111,7 @@ impl ChunkSchema {
103111

104112
#[inline]
105113
pub fn row_id_column(&self) -> &RowIdColumnDescriptor {
106-
&self.row_id
107-
}
108-
109-
#[inline]
110-
pub fn index_columns(&self) -> &[IndexColumnDescriptor] {
111-
&self.sorbet.columns.indices
112-
}
113-
114-
#[inline]
115-
pub fn component_columns(&self) -> &[ComponentColumnDescriptor] {
116-
&self.sorbet.columns.components
114+
&self.chunk_columns.row_id
117115
}
118116

119117
pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata {
@@ -148,20 +146,15 @@ impl TryFrom<SorbetSchema> for ChunkSchema {
148146

149147
fn try_from(sorbet_schema: SorbetSchema) -> Result<Self, Self::Error> {
150148
Ok(Self {
151-
row_id: sorbet_schema
152-
.columns
153-
.row_id
154-
.clone()
155-
.ok_or_else(|| SorbetError::custom("Missing row_id column"))?,
156-
chunk_id: sorbet_schema
157-
.chunk_id
158-
.ok_or_else(|| SorbetError::custom("Missing chunk_id"))?,
149+
sorbet: sorbet_schema.clone(),
150+
151+
chunk_columns: ChunkColumnDescriptors::try_from(sorbet_schema.columns.clone())?,
152+
153+
chunk_id: sorbet_schema.chunk_id.ok_or(SorbetError::MissingChunkId)?,
154+
159155
entity_path: sorbet_schema
160156
.entity_path
161-
.clone()
162-
.ok_or_else(|| SorbetError::custom("Missing entity_path"))?,
163-
164-
sorbet: sorbet_schema,
157+
.ok_or(SorbetError::MissingEntityPath)?,
165158
})
166159
}
167160
}

crates/store/re_sorbet/src/column_descriptor.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,23 @@ pub enum ColumnError {
2727
UnsupportedTimeType(#[from] crate::UnsupportedTimeType),
2828
}
2929

30-
// Describes any kind of column.
31-
//
32-
// See:
33-
// * [`IndexColumnDescriptor`]
34-
// * [`ComponentColumnDescriptor`]
35-
//TODO(#9034): This should support RowId as well, but this has ramifications on the dataframe API.
30+
/// Describes any kind of column.
31+
///
32+
/// See:
33+
/// * [`RowIdColumnDescriptor`]
34+
/// * [`IndexColumnDescriptor`]
35+
/// * [`ComponentColumnDescriptor`]
3636
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3737
pub enum ColumnDescriptor {
38+
/// The primary row id column.
39+
///
40+
/// There should usually only be one of these.
3841
RowId(RowIdColumnDescriptor),
42+
43+
/// Index columns (timelines).
3944
Time(IndexColumnDescriptor),
45+
46+
/// The actual component data
4047
Component(ComponentColumnDescriptor),
4148
}
4249

0 commit comments

Comments
 (0)