Skip to content

fix(tesseract): Fix rolling window external pre-aggregation #9625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,11 @@ export class BaseQuery {
*/
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
this.canUseNativeSqlPlannerPreAggregation = false;
this.canUseNativeSqlPlannerPreAggregation = true;
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
const hasMultiStageMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true }).multiStageMembers.length > 0;
this.canUseNativeSqlPlannerPreAggregation = hasMultiStageMeasures;
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });

this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0 || fullAggregateMeasures.cumulativeMeasures.length > 0;
}
this.queryLevelJoinHints = this.options.joinHints ?? [];
this.prebuildJoin();
Expand Down Expand Up @@ -775,6 +776,13 @@ export class BaseQuery {
);
}

driverTools(external) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name it smth like queryForDriverTools or driverToolsQuery...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Tesseract’s perspective, this is indeed driverTools. In the future, I plan to extract it from BaseQuery into a separate class. In other words, Tesseract receives this through an interface that only exposes the parts of BaseQuery responsible for driver-specific SQL generation.

if (external && !this.options.disableExternalPreAggregations && this.externalQueryClass) {
return this.externalQuery();
}
return this;
}

buildSqlAndParamsRust(exportAnnotatedSql) {
const order = this.options.order && R.pipe(
R.map((hash) => ((!hash || !hash.id) ? null : hash)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,13 @@ export class CubeStoreQuery extends BaseQuery {
}
);
}

public sqlTemplates() {
const templates = super.sqlTemplates();
templates.statements.time_series_select = '{% for time_item in seria %}' +
'select to_timestamp(\'{{ time_item[0] }}\') date_from, to_timestamp(\'{{ time_item[1] }}\') date_to \n' +
'{% if not loop.last %} UNION ALL\n{% endif %}' +
'{% endfor %}';
return templates;
}
}
27 changes: 2 additions & 25 deletions rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::base_query_options::FilterItem;
use super::driver_tools::{DriverTools, NativeDriverTools};
use super::filter_group::{FilterGroup, NativeFilterGroup};
use super::filter_params::{FilterParams, NativeFilterParams};
use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj};
Expand All @@ -16,12 +17,7 @@ use std::rc::Rc;

#[nativebridge::native_bridge]
pub trait BaseTools {
fn convert_tz(&self, field: String) -> Result<String, CubeError>;
fn time_grouped_column(
&self,
granularity: String,
dimension: String,
) -> Result<String, CubeError>;
fn driver_tools(&self, external: bool) -> Result<Rc<dyn DriverTools>, CubeError>;
fn sql_templates(&self) -> Result<Rc<dyn SqlTemplatesRender>, CubeError>;
fn security_context_for_rust(&self) -> Result<Rc<dyn SecurityContext>, CubeError>;
fn sql_utils_for_rust(&self) -> Result<Rc<dyn SqlUtils>, CubeError>;
Expand All @@ -33,10 +29,6 @@ pub trait BaseTools {
&self,
used_filters: Option<Vec<FilterItem>>,
) -> Result<Rc<dyn FilterGroup>, CubeError>;
fn timestamp_precision(&self) -> Result<u32, CubeError>;
fn time_stamp_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
fn date_time_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
fn generate_time_series(
&self,
granularity: String,
Expand All @@ -49,23 +41,8 @@ pub trait BaseTools {
origin: String,
) -> Result<Vec<Vec<String>>, CubeError>;
fn get_allocated_params(&self) -> Result<Vec<String>, CubeError>;
fn subtract_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn add_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn add_timestamp_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn all_cube_members(&self, path: String) -> Result<Vec<String>, CubeError>;
fn interval_and_minimal_time_unit(&self, interval: String) -> Result<Vec<String>, CubeError>;
//===== TODO Move to templates
fn hll_init(&self, sql: String) -> Result<String, CubeError>;
fn hll_merge(&self, sql: String) -> Result<String, CubeError>;
fn hll_cardinality_merge(&self, sql: String) -> Result<String, CubeError>;
fn count_distinct_approx(&self, sql: String) -> Result<String, CubeError>;
fn date_bin(
&self,
interval: String,
source: String,
origin: String,
) -> Result<String, CubeError>;

fn get_pre_aggregation_by_name(
&self,
cube_name: String,
Expand Down
39 changes: 39 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/driver_tools.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender};
use cubenativeutils::wrappers::serializer::{
NativeDeserialize, NativeDeserializer, NativeSerialize,
};
use cubenativeutils::wrappers::NativeContextHolder;
use cubenativeutils::wrappers::NativeObjectHandle;
use cubenativeutils::CubeError;
use std::any::Any;
use std::rc::Rc;

#[nativebridge::native_bridge]
pub trait DriverTools {
fn convert_tz(&self, field: String) -> Result<String, CubeError>;
fn time_grouped_column(
&self,
granularity: String,
dimension: String,
) -> Result<String, CubeError>;
fn sql_templates(&self) -> Result<Rc<dyn SqlTemplatesRender>, CubeError>;
fn timestamp_precision(&self) -> Result<u32, CubeError>;
fn time_stamp_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
fn date_time_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
fn get_allocated_params(&self) -> Result<Vec<String>, CubeError>;
fn subtract_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn add_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn add_timestamp_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
fn interval_and_minimal_time_unit(&self, interval: String) -> Result<Vec<String>, CubeError>;
fn hll_init(&self, sql: String) -> Result<String, CubeError>;
fn hll_merge(&self, sql: String) -> Result<String, CubeError>;
fn hll_cardinality_merge(&self, sql: String) -> Result<String, CubeError>;
fn count_distinct_approx(&self, sql: String) -> Result<String, CubeError>;
fn date_bin(
&self,
interval: String,
source: String,
origin: String,
) -> Result<String, CubeError>;
}
1 change: 1 addition & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod case_item;
pub mod case_label;
pub mod cube_definition;
pub mod dimension_definition;
pub mod driver_tools;
pub mod evaluator;
pub mod filter_group;
pub mod filter_params;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
use crate::logical_plan::*;
use crate::plan::FilterItem;
use crate::planner::query_tools::QueryTools;
Expand Down Expand Up @@ -29,7 +28,7 @@ impl MatchState {

pub struct PreAggregationOptimizer {
query_tools: Rc<QueryTools>,
used_pre_aggregations: HashMap<(String, String), Rc<dyn PreAggregationObj>>,
used_pre_aggregations: HashMap<(String, String), Rc<PreAggregation>>,
}

impl PreAggregationOptimizer {
Expand Down Expand Up @@ -71,7 +70,7 @@ impl PreAggregationOptimizer {
Ok(None)
}

pub fn get_used_pre_aggregations(&self) -> Vec<Rc<dyn PreAggregationObj>> {
pub fn get_used_pre_aggregations(&self) -> Vec<Rc<PreAggregation>> {
self.used_pre_aggregations.values().cloned().collect()
}

Expand Down Expand Up @@ -445,15 +444,14 @@ impl PreAggregationOptimizer {
granularity: pre_aggregation.granularity.clone(),
table_name: table_name.clone(),
cube_name: pre_aggregation.cube_name.clone(),
pre_aggregation_obj,
};
let result = Rc::new(pre_aggregation);
self.used_pre_aggregations.insert(
(
pre_aggregation.cube_name.clone(),
pre_aggregation.name.clone(),
),
pre_aggregation_obj.clone(),
(result.cube_name.clone(), result.name.clone()),
result.clone(),
);
Ok(Rc::new(pre_aggregation))
Ok(result)
} else {
Err(CubeError::internal(format!(
"Cannot find pre aggregation object for cube {} and name {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
use crate::planner::sql_evaluator::MemberSymbol;
use itertools::Itertools;
use std::rc::Rc;
Expand All @@ -13,6 +14,7 @@ pub struct PreAggregation {
pub granularity: Option<String>,
pub table_name: String,
pub cube_name: String,
pub pre_aggregation_obj: Rc<dyn PreAggregationObj>,
}

impl PrettyPrint for PreAggregation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ impl PhysicalPlanBuilderContext {

pub struct PhysicalPlanBuilder {
query_tools: Rc<QueryTools>,
_plan_sql_templates: PlanSqlTemplates,
plan_sql_templates: PlanSqlTemplates,
}

impl PhysicalPlanBuilder {
pub fn new(query_tools: Rc<QueryTools>) -> Self {
let plan_sql_templates = query_tools.plan_sql_templates();
pub fn new(query_tools: Rc<QueryTools>, plan_sql_templates: PlanSqlTemplates) -> Self {
Self {
query_tools,
_plan_sql_templates: plan_sql_templates,
plan_sql_templates,
}
}

Expand Down Expand Up @@ -114,6 +113,7 @@ impl PhysicalPlanBuilder {
) -> Result<Rc<Select>, CubeError> {
let mut render_references = HashMap::new();
let mut measure_references = HashMap::new();
let mut dimensions_references = HashMap::new();
let mut context_factory = context.make_sql_nodes_factory();
let from = match &logical_plan.source {
SimpleQuerySource::LogicalJoin(join) => self.process_logical_join(
Expand All @@ -126,8 +126,8 @@ impl PhysicalPlanBuilder {
let res = self.process_pre_aggregation(
pre_aggregation,
context,
&mut render_references,
&mut measure_references,
&mut dimensions_references,
)?;
for member in logical_plan.schema.time_dimensions.iter() {
context_factory.add_dimensions_with_ignored_timezone(member.full_name());
Expand All @@ -140,6 +140,7 @@ impl PhysicalPlanBuilder {
let mut select_builder = SelectBuilder::new(from);
context_factory.set_ungrouped(logical_plan.ungrouped);
context_factory.set_pre_aggregation_measures_references(measure_references);
context_factory.set_pre_aggregation_dimensions_references(dimensions_references);

let mut group_by = Vec::new();
for member in logical_plan.schema.dimensions.iter() {
Expand Down Expand Up @@ -197,8 +198,8 @@ impl PhysicalPlanBuilder {
&self,
pre_aggregation: &Rc<PreAggregation>,
_context: &PhysicalPlanBuilderContext,
render_references: &mut HashMap<String, QualifiedColumnName>,
measure_references: &mut HashMap<String, QualifiedColumnName>,
dimensions_references: &mut HashMap<String, QualifiedColumnName>,
) -> Result<Rc<From>, CubeError> {
let mut pre_aggregation_schema = Schema::empty();
let pre_aggregation_alias = PlanSqlTemplates::memeber_alias_name(
Expand All @@ -213,7 +214,7 @@ impl PhysicalPlanBuilder {
&dim.alias_suffix(),
self.query_tools.clone(),
)?;
render_references.insert(
dimensions_references.insert(
dim.full_name(),
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
);
Expand All @@ -226,16 +227,10 @@ impl PhysicalPlanBuilder {
granularity,
self.query_tools.clone(),
)?;
render_references.insert(
dimensions_references.insert(
dim.full_name(),
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
);
if let Some(granularity) = &granularity {
render_references.insert(
format!("{}_{}", dim.full_name(), granularity),
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
);
}
pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(dim.full_name())));
}
for meas in pre_aggregation.measures.iter() {
Expand Down Expand Up @@ -982,9 +977,7 @@ impl PhysicalPlanBuilder {
));
};

let templates = self.query_tools.plan_sql_templates();

let ts_date_range = if templates.supports_generated_time_series()
let ts_date_range = if self.plan_sql_templates.supports_generated_time_series()
&& granularity_obj.is_predefined_granularity()
{
if let Some(date_range) = time_dimension_symbol
Expand Down
17 changes: 5 additions & 12 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ impl RegularRollingWindowJoinCondition {
};

let trailing_start = if let Some(trailing_interval) = &self.trailing_interval {
templates
.base_tools()
.subtract_interval(start_date, trailing_interval.clone())?
templates.subtract_interval(start_date, trailing_interval.clone())?
} else {
start_date
};
Expand All @@ -72,9 +70,7 @@ impl RegularRollingWindowJoinCondition {
};

let leading_end = if let Some(leading_interval) = &self.leading_interval {
templates
.base_tools()
.add_interval(end_date, leading_interval.clone())?
templates.add_interval(end_date, leading_interval.clone())?
} else {
end_date
};
Expand Down Expand Up @@ -121,7 +117,7 @@ pub struct ToDateRollingWindowJoinCondition {
time_series_source: String,
granularity: String,
time_dimension: Expr,
query_tools: Rc<QueryTools>,
_query_tools: Rc<QueryTools>,
}

impl ToDateRollingWindowJoinCondition {
Expand All @@ -135,7 +131,7 @@ impl ToDateRollingWindowJoinCondition {
time_series_source,
granularity,
time_dimension,
query_tools,
_query_tools: query_tools,
}
}

Expand All @@ -150,10 +146,7 @@ impl ToDateRollingWindowJoinCondition {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?;
let date_to =
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?;
let grouped_from = self
.query_tools
.base_tools()
.time_grouped_column(self.granularity.clone(), date_from)?;
let grouped_from = templates.time_grouped_column(self.granularity.clone(), date_from)?;
let result = format!("{date_column} >= {grouped_from} and {date_column} <= {date_to}");
Ok(result)
}
Expand Down
1 change: 0 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl TimeSeries {
&& self.granularity.is_predefined_granularity()
{
let interval_description = templates
.base_tools()
.interval_and_minimal_time_unit(self.granularity.granularity_interval().clone())?;
if interval_description.len() != 2 {
return Err(CubeError::internal(
Expand Down
Loading
Loading