Skip to content

Commit 278e085

Browse files
authored
Only trigger a datafusion update when the relevant part of the blueprint changes (#9916)
### Related * in prep for #9795 ### What Another pure refactor to express the fact that only _part_ of a `TableBlueprint` influences/is handled by the datafusion query. Currently, that "part" is 100% of the table blueprint, but I want to add new stuff to it that does not concern datafusion. This PR makes this separation of concern more explicit.
1 parent 54ddc8c commit 278e085

File tree

1 file changed

+46
-12
lines changed

1 file changed

+46
-12
lines changed

crates/viewer/re_dataframe_ui/src/datafusion_adapter.rs

+46-12
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use parking_lot::Mutex;
99
use re_sorbet::{BatchType, SorbetBatch};
1010
use re_viewer_context::AsyncRuntimeHandle;
1111

12-
use crate::table_blueprint::TableBlueprint;
12+
use crate::table_blueprint::{PartitionLinksSpec, SortBy, TableBlueprint};
1313
use crate::RequestedObject;
1414

1515
/// Make sure we escape column names correctly for datafusion.
@@ -24,25 +24,50 @@ fn col(name: &str) -> datafusion::logical_expr::Expr {
2424
datafusion_col(format!("{name:?}"))
2525
}
2626

27+
/// The subset of [`TableBlueprint`] that is actually handled by datafusion.
28+
///
29+
/// In general, there are aspects of a table blueprint that are handled by the UI in an immediate
30+
/// mode fashion (e.g. is a column visible?), and other aspects that are handled by datafusion (e.g.
31+
/// sorting). This struct is for the latter.
32+
#[derive(Debug, Clone, PartialEq, Eq, Default)]
33+
struct DataFusionQueryData {
34+
pub sort_by: Option<SortBy>,
35+
pub partition_links: Option<PartitionLinksSpec>,
36+
}
37+
38+
impl From<&TableBlueprint> for DataFusionQueryData {
39+
fn from(value: &TableBlueprint) -> Self {
40+
let TableBlueprint {
41+
sort_by,
42+
partition_links,
43+
} = value;
44+
45+
Self {
46+
sort_by: sort_by.clone(),
47+
partition_links: partition_links.clone(),
48+
}
49+
}
50+
}
51+
2752
/// A table blueprint along with the context required to execute the corresponding datafusion query.
2853
#[derive(Clone)]
2954
struct DataFusionQuery {
3055
session_ctx: Arc<SessionContext>,
3156
table_ref: TableReference,
3257

33-
blueprint: TableBlueprint,
58+
query_data: DataFusionQueryData,
3459
}
3560

3661
impl DataFusionQuery {
3762
fn new(
3863
session_ctx: Arc<SessionContext>,
3964
table_ref: TableReference,
40-
blueprint: TableBlueprint,
65+
query_data: DataFusionQueryData,
4166
) -> Self {
4267
Self {
4368
session_ctx,
4469
table_ref,
45-
blueprint,
70+
query_data,
4671
}
4772
}
4873

@@ -53,10 +78,10 @@ impl DataFusionQuery {
5378
async fn execute(self) -> Result<Vec<SorbetBatch>, DataFusionError> {
5479
let mut dataframe = self.session_ctx.table(self.table_ref).await?;
5580

56-
let TableBlueprint {
81+
let DataFusionQueryData {
5782
sort_by,
5883
partition_links,
59-
} = &self.blueprint;
84+
} = &self.query_data;
6085

6186
// Important: the needs to happen first, in case we sort/filter/etc. based on that
6287
// particular column.
@@ -103,12 +128,12 @@ impl PartialEq for DataFusionQuery {
103128
let Self {
104129
session_ctx,
105130
table_ref,
106-
blueprint,
131+
query_data,
107132
} = self;
108133

109134
Arc::ptr_eq(session_ctx, &other.session_ctx)
110135
&& table_ref == &other.table_ref
111-
&& blueprint == &other.blueprint
136+
&& query_data == &other.query_data
112137
}
113138
}
114139

@@ -119,6 +144,9 @@ type RequestedSorbetBatches = RequestedObject<Result<Vec<SorbetBatch>, DataFusio
119144
pub struct DataFusionAdapter {
120145
id: egui::Id,
121146

147+
/// The current table blueprint
148+
blueprint: TableBlueprint,
149+
122150
/// The query used to produce the dataframe.
123151
query: DataFusionQuery,
124152

@@ -149,10 +177,12 @@ impl DataFusionAdapter {
149177
let adapter = ui.data(|data| data.get_temp::<Self>(id));
150178

151179
let adapter = adapter.unwrap_or_else(|| {
152-
let query = DataFusionQuery::new(Arc::clone(session_ctx), table_ref, initial_blueprint);
180+
let initial_query = DataFusionQueryData::from(&initial_blueprint);
181+
let query = DataFusionQuery::new(Arc::clone(session_ctx), table_ref, initial_query);
153182

154183
let table_state = Self {
155184
id,
185+
blueprint: initial_blueprint,
156186
requested_sorbet_batches: Arc::new(Mutex::new(RequestedObject::new_with_repaint(
157187
runtime,
158188
ui.ctx().clone(),
@@ -175,7 +205,7 @@ impl DataFusionAdapter {
175205
}
176206

177207
pub fn blueprint(&self) -> &TableBlueprint {
178-
&self.query.blueprint
208+
&self.blueprint
179209
}
180210

181211
/// Update the query and save the state to egui's memory.
@@ -188,8 +218,12 @@ impl DataFusionAdapter {
188218
ui: &egui::Ui,
189219
new_blueprint: TableBlueprint,
190220
) {
191-
if self.query.blueprint != new_blueprint {
192-
self.query.blueprint = new_blueprint;
221+
self.blueprint = new_blueprint;
222+
223+
// retrigger a new datafusion query if required.
224+
let new_query_data = DataFusionQueryData::from(&self.blueprint);
225+
if self.query.query_data != new_query_data {
226+
self.query.query_data = new_query_data;
193227

194228
let mut dataframe = self.requested_sorbet_batches.lock();
195229

0 commit comments

Comments
 (0)