diff --git a/tesseract-core/src/query_ir.rs b/tesseract-core/src/query_ir.rs index d2dc29aa..cda18e45 100644 --- a/tesseract-core/src/query_ir.rs +++ b/tesseract-core/src/query_ir.rs @@ -77,13 +77,50 @@ impl DrilldownSql { cols } + pub fn col_alias_string2(&self) -> String { + let cols = self.col_alias_vec2(); + join(cols, ", ") + } + + fn col_alias_vec2(&self) -> Vec { + let mut cols: Vec<_> = self.level_columns.iter() + .map(|l| { + if let Some(ref name_col) = l.name_column { + format!("{}.{} as {}_{}, {} as {}_{}", + self.table.name, + l.key_column, + l.key_column, + self.alias_postfix, + name_col, + name_col, + self.alias_postfix, + ) + } else { + format!("{}.{} as {}_{}", + self.table.name, + l.key_column, + l.key_column, + self.alias_postfix, + ) + } + }).collect(); + + if self.property_columns.len() != 0 { + cols.push( + join(&self.property_columns, ", ") + ); + } + + cols + } + pub fn col_alias_only_string(&self) -> String { let cols = self.col_alias_only_vec(); join(cols, ", ") } pub fn col_alias_only_vec(&self) -> Vec { - let mut cols = vec![]; + let mut cols = vec![]; // can't just map the cols, because some levels // produce two and some produce one @@ -370,4 +407,3 @@ pub fn dim_subquery(drill: Option<&DrilldownSql>, cut: Option<&CutSql>) -> DimSu dim_cols: None, } } - diff --git a/tesseract-postgres/Cargo.toml b/tesseract-postgres/Cargo.toml index 64d7f7c5..2736488a 100644 --- a/tesseract-postgres/Cargo.toml +++ b/tesseract-postgres/Cargo.toml @@ -14,6 +14,7 @@ tokio = "0.1" bb8 = "0.3" bb8-postgres = "0.3" futures-state-stream = "0.2" +itertools = "0.8.0" [dependencies.tesseract-core] path = "../tesseract-core" diff --git a/tesseract-postgres/src/lib.rs b/tesseract-postgres/src/lib.rs index f9f3bad3..7545e938 100644 --- a/tesseract-postgres/src/lib.rs +++ b/tesseract-postgres/src/lib.rs @@ -1,5 +1,5 @@ use failure::{Error, format_err}; -use tesseract_core::{Backend, DataFrame}; +use tesseract_core::{Backend, DataFrame, QueryIr}; use futures::{Future, Stream}; use tokio_postgres::NoTls; extern crate futures; @@ -16,6 +16,10 @@ use futures::{ }; mod df; +mod sql; + +use self::sql::postgres_sql; + use self::df::{rows_to_df}; #[derive(Clone)] @@ -76,6 +80,23 @@ impl Backend for Postgres { fn box_clone(&self) -> Box { Box::new((*self).clone()) } + + fn generate_sql(&self, query_ir: QueryIr) -> String { + postgres_sql( + &query_ir.table, + &query_ir.cuts, + &query_ir.drills, + &query_ir.meas, + &query_ir.filters, + &query_ir.top, + &query_ir.top_where, + &query_ir.sort, + &query_ir.limit, + &query_ir.rca, + &query_ir.growth, + ) + } + } diff --git a/tesseract-postgres/src/sql.rs b/tesseract-postgres/src/sql.rs new file mode 100644 index 00000000..87f43846 --- /dev/null +++ b/tesseract-postgres/src/sql.rs @@ -0,0 +1,51 @@ +mod aggregator; +mod growth; +mod options; +mod primary_agg; + +use tesseract_core::query_ir::{ + TableSql, + CutSql, + DrilldownSql, + MeasureSql, + TopSql, + TopWhereSql, + SortSql, + LimitSql, + RcaSql, + GrowthSql, + FilterSql, + dim_subquery, +}; +use self::options::wrap_options; +use self::primary_agg::primary_agg; +use itertools::join; + +/// Error checking is done before this point. This string formatter +/// accepts any input +pub fn postgres_sql( + table: &TableSql, + cuts: &[CutSql], + drills: &[DrilldownSql], + meas: &[MeasureSql], + filters: &[FilterSql], + // TODO put Filters and Calculations into own structs + top: &Option, + top_where: &Option, + sort: &Option, + limit: &Option, + rca: &Option, + growth: &Option, +) -> String +{ + let mut final_sql = primary_agg(table, cuts, drills, meas); + + if let Some(growth) = growth { + let (sql, drill_cols) = growth::calculate(final_sql, drills, meas.len(), growth); + final_sql = sql; + } + + final_sql = wrap_options(final_sql, drills, top, top_where, sort, limit, filters); + + final_sql +} diff --git a/tesseract-postgres/src/sql/aggregator.rs b/tesseract-postgres/src/sql/aggregator.rs new file mode 100644 index 00000000..a14fc5f5 --- /dev/null +++ b/tesseract-postgres/src/sql/aggregator.rs @@ -0,0 +1,288 @@ +//! Applying aggregates to measures +//! +//! This is a little complex, because some formulas are complex. +//! +//! for performance reasons, there's an aggregation at the fact table scan level, +//! and then a second aggregation when rolling up to a parent level. +//! +//! This works great for simple aggregations like sum, doing it in two parts doesn't +//! affect the aggregation. +//! +//! However, for more complex formulas like median, weighted avg, and moe, applying the +//! full formula to the first pass loses information needed for the second pass. +//! +//! Therefore, I've hardcoded weighted avg and moe so that the sums are done in the first +//! pass, but then the formula is applied at the second pass. +//! +//! median is not yet implemented. Custom is halfway implemented, but will need some guardrails. + +use log::*; +use itertools::join; +use tesseract_core::Aggregator; + +/// First pass for aggregator +/// This is called only when doing aggregations on the fact table. +/// For more complex aggregations like weighted average and moe, some component +/// parts are aggregated here, but the equation (with divisions or other complex +/// arithmetic) are not called until the final pass +pub fn agg_sql_string_pass_1(col: &str, aggregator: &Aggregator, mea_idx: usize) -> String { + info!("{:?}", aggregator); + + match aggregator { + Aggregator::Sum => format!("sum({}) as m{}", col, mea_idx), + Aggregator::Count => format!("count({}) as m{}", col, mea_idx), + Aggregator::Average => format!("avg({}) as m{}", col, mea_idx), + Aggregator::Median => format!("median({}) as m{}", col, mea_idx), + Aggregator::WeightedAverage { weight_column } => { + format!("sum({0} * {1}) as m{2}_weighted_avg_num, sum({1}) as m{2}_weighted_avg_denom", + col, + weight_column, + mea_idx, + ) + }, + Aggregator::Moe { secondary_columns, .. }=> { + let secondaries = secondary_columns.iter().enumerate() + .map(|(n, s_col)| { + format!("sum({}) as m{}_moe_secondary_{}", s_col, mea_idx, n) + }); + + format!("sum({}) as m{}_moe_primary, {}", + col, + mea_idx, + join(secondaries, ", "), + ) + }, + Aggregator::WeightedAverageMoe { primary_weight, secondary_weight_columns, .. }=> { + let secondaries = secondary_weight_columns.iter().enumerate() + .map(|(n, s_col)| { + format!("sum({} * {}) as m{}_moe_secondary_weighted_avg_num_{}, sum({}) as m{}_moe_secondary_weighted_avg_denom_{}", + col, + s_col, + mea_idx, + n, + s_col, + mea_idx, + n, + ) + }); + + format!("sum({} * {}) as m{}_moe_primary_weighted_avg_num, sum({}) as m{}_moe_primary_weighted_avg_denom, {}", + col, + primary_weight, + mea_idx, + primary_weight, + mea_idx, + join(secondaries, ", "), + ) + }, + Aggregator::Custom(s) => { + let custom = s.replace("{}", col); + format!("{} as m{}", custom, mea_idx) + }, + } +} + +// this is used to select mea cols as they bubble up from the fact subquery through +// each subquery join +pub fn agg_sql_string_select_mea(aggregator: &Aggregator, mea_idx: usize) -> String { + match aggregator { + Aggregator::Sum => format!("m{0}", mea_idx), + Aggregator::Count => format!("m{0}", mea_idx), + Aggregator::Average => format!("m{0}", mea_idx), + Aggregator::Median => format!("m{0}", mea_idx), + Aggregator::WeightedAverage { .. } => { + format!("m{0}_weighted_avg_num, m{0}_weighted_avg_denom", + mea_idx, + ) + }, + Aggregator::Moe { secondary_columns, .. }=> { + let secondaries = secondary_columns.iter().enumerate() + .map(|(n, _)| { + format!("m{}_moe_secondary_{}", mea_idx, n) + }); + + format!("m{}_moe_primary, {}", + mea_idx, + join(secondaries, ", "), + ) + }, + Aggregator::WeightedAverageMoe { secondary_weight_columns, .. }=> { + let secondaries = secondary_weight_columns.iter().enumerate() + .map(|(n, _)| { + format!("m{0}_moe_secondary_weighted_avg_num_{1}, m{0}_moe_secondary_weighted_avg_denom_{1}", mea_idx, n) + }); + + format!("m{}_moe_primary_weighted_avg_num, m{}_moe_primary_weighted_avg_denom, {}", + mea_idx, + mea_idx, + join(secondaries, ", "), + ) + }, + Aggregator::Custom(_) => format!("m{}", mea_idx), + } +} + +/// computes final formula for aggregates after all joins +/// For simple aggregates, can just apply the fn and add alias +/// +/// For more complex aggregations, the full formula an be applied at this level +pub fn agg_sql_string_pass_2(aggregator: &Aggregator, mea_idx: usize) -> String { + info!("{:?}", aggregator); + + match aggregator { + Aggregator::Sum => format!("sum(m{0}) as final_m{0}", mea_idx), + Aggregator::Count => format!("sum(m{0}) as final_m{0}", mea_idx), + Aggregator::Average => format!("avg(m{0}) as final_m{0}", mea_idx), + Aggregator::Median => format!("median(m{0}) as final_m{0}", mea_idx), + Aggregator::WeightedAverage { .. } => { + format!("(sum(m{0}_weighted_avg_num) / sum(m{0}_weighted_avg_denom)) as final_m{0}", + mea_idx, + ) + }, + Aggregator::Moe { design_factor, secondary_columns }=> { + let inner_seq = secondary_columns.iter().enumerate() + .map(|(n, _)| { + format!("pow(sum(m{0}_moe_primary) - sum(m{0}_moe_secondary_{1}), 2)", + mea_idx, + n, + ) + }); + let inner_seq = join(inner_seq, " + "); + + format!("1.645 * sqrt({} * ({}))", + design_factor / secondary_columns.len() as f64, + inner_seq, + ) + }, + Aggregator::WeightedAverageMoe { design_factor, secondary_weight_columns, .. }=> { + let inner_seq = secondary_weight_columns.iter().enumerate() + .map(|(n, _)| { + format!("pow(\ + (sum(m{0}_moe_primary_weighted_avg_num) / sum(m{0}_moe_primary_weighted_avg_denom)) - \ + (sum(m{0}_moe_secondary_weighted_avg_num_{1}) / sum(m{0}_moe_secondary_weighted_avg_denom_{1}))\ + , 2)", + mea_idx, + n, + ) + }); + let inner_seq = join(inner_seq, " + "); + + format!("1.645 * sqrt({} * ({}))", + design_factor / secondary_weight_columns.len() as f64, + inner_seq, + ) + }, + Aggregator::Custom(s) => { + let custom = s.replace("{}", &format!("m{}", mea_idx)); + format!("{} as m{}", custom, mea_idx) + }, + } +} + +#[cfg(test)] +mod test { + use super::*; + + use tesseract_core::schema::aggregator::Aggregator; + + #[test] + fn basic_aggs() { + assert_eq!( + agg_sql_string_pass_1("col_1".into(), &Aggregator::Sum, 0), + "sum(col_1) as m0".to_owned(), + ); + assert_eq!( + agg_sql_string_pass_2(&Aggregator::Sum, 0), + "sum(m0) as final_m0".to_owned(), + ); + assert_eq!( + agg_sql_string_select_mea(&Aggregator::Sum, 0), + "m0".to_owned(), + ); + } + + #[test] + fn weighted_avg() { + let agg = Aggregator::WeightedAverage { + weight_column: "weight_col".into(), + }; + assert_eq!( + agg_sql_string_pass_1("col_1".into(), &agg, 0), + "sum(col_1 * weight_col) as m0_weighted_avg_num, sum(weight_col) as m0_weighted_avg_denom".to_owned(), + ); + assert_eq!( + agg_sql_string_pass_2(&agg, 0), + "(sum(m0_weighted_avg_num) / sum(m0_weighted_avg_denom)) as final_m0".to_owned(), + ); + assert_eq!( + agg_sql_string_select_mea(&agg, 0), + "m0_weighted_avg_num, m0_weighted_avg_denom".to_owned(), + ); + } + + #[test] + fn moe() { + let agg = Aggregator::Moe { + design_factor: 3.0, + secondary_columns: vec!["s0".into(), "s1".into(), "s2".into()], + }; + assert_eq!( + agg_sql_string_pass_1("col_1".into(), &agg, 0), + "sum(col_1) as m0_moe_primary, \ + sum(s0) as m0_moe_secondary_0, \ + sum(s1) as m0_moe_secondary_1, \ + sum(s2) as m0_moe_secondary_2\ + ".to_owned(), + ); + assert_eq!( + agg_sql_string_pass_2(&agg, 0), + "1.645 * sqrt(1 * (\ + pow(sum(m0_moe_primary) - sum(m0_moe_secondary_0), 2) + \ + pow(sum(m0_moe_primary) - sum(m0_moe_secondary_1), 2) + \ + pow(sum(m0_moe_primary) - sum(m0_moe_secondary_2), 2)\ + ))\ + ".to_owned(), + ); + assert_eq!( + agg_sql_string_select_mea(&agg, 0), + "m0_moe_primary, m0_moe_secondary_0, m0_moe_secondary_1, m0_moe_secondary_2".to_owned(), + ); + } + + #[test] + fn weighted_average_moe() { + let agg = Aggregator::WeightedAverageMoe { + design_factor: 3.0, + primary_weight: "w".into(), + secondary_weight_columns: vec!["w0".into(), "w1".into(), "w2".into()], + }; + assert_eq!( + agg_sql_string_pass_1("col_1".into(), &agg, 0), + "sum(col_1 * w) as m0_moe_primary_weighted_avg_num, \ + sum(w) as m0_moe_primary_weighted_avg_denom, \ + sum(col_1 * w0) as m0_moe_secondary_weighted_avg_num_0, \ + sum(w0) as m0_moe_secondary_weighted_avg_denom_0, \ + sum(col_1 * w1) as m0_moe_secondary_weighted_avg_num_1, \ + sum(w1) as m0_moe_secondary_weighted_avg_denom_1, \ + sum(col_1 * w2) as m0_moe_secondary_weighted_avg_num_2, \ + sum(w2) as m0_moe_secondary_weighted_avg_denom_2\ + ".to_owned(), + ); + assert_eq!( + agg_sql_string_pass_2(&agg, 0), + "1.645 * sqrt(1 * (\ + pow((sum(m0_moe_primary_weighted_avg_num) / sum(m0_moe_primary_weighted_avg_denom)) - (sum(m0_moe_secondary_weighted_avg_num_0) / sum(m0_moe_secondary_weighted_avg_denom_0)), 2) + \ + pow((sum(m0_moe_primary_weighted_avg_num) / sum(m0_moe_primary_weighted_avg_denom)) - (sum(m0_moe_secondary_weighted_avg_num_1) / sum(m0_moe_secondary_weighted_avg_denom_1)), 2) + \ + pow((sum(m0_moe_primary_weighted_avg_num) / sum(m0_moe_primary_weighted_avg_denom)) - (sum(m0_moe_secondary_weighted_avg_num_2) / sum(m0_moe_secondary_weighted_avg_denom_2)), 2)\ + ))\ + ".to_owned(), + ); + assert_eq!( + agg_sql_string_select_mea(&agg, 0), + "m0_moe_primary_weighted_avg_num, m0_moe_primary_weighted_avg_denom, \ + m0_moe_secondary_weighted_avg_num_0, m0_moe_secondary_weighted_avg_denom_0, \ + m0_moe_secondary_weighted_avg_num_1, m0_moe_secondary_weighted_avg_denom_1, \ + m0_moe_secondary_weighted_avg_num_2, m0_moe_secondary_weighted_avg_denom_2".to_owned(), + ); + } +} diff --git a/tesseract-postgres/src/sql/growth.rs b/tesseract-postgres/src/sql/growth.rs new file mode 100644 index 00000000..e1ee7d7f --- /dev/null +++ b/tesseract-postgres/src/sql/growth.rs @@ -0,0 +1,40 @@ +use itertools::join; + +use super::GrowthSql; + +use tesseract_core::query_ir::{DrilldownSql}; + +pub fn drilldown_list_helper(table_alias: &str, my_drills: &Vec<&DrilldownSql>) -> String +{ + // This helper function takes a table alias and a vector of drilldowns + // and returns a comma separated list of the column aliases prefixed by the table alias + let time_drill_cols = my_drills.iter().map(|drill| { + let tmp: Vec = drill.col_alias_only_vec().iter().map(|alias| format!("{}.{}", table_alias, alias)).collect(); + return join(tmp, ", "); + }); + let time_drill_cols = join(time_drill_cols, ", "); + time_drill_cols +} + +pub fn calculate( + final_sql: String, + drills: &[DrilldownSql], + num_measures: usize, + growth: &GrowthSql, +) -> (String, String) +{ + let growth_table_alias = "growth_subquery"; + let non_time_drills: Vec<&DrilldownSql> = drills.iter().filter(|drill| drill.col_qual_string() != growth.time_drill.col_qual_string()).collect(); + let final_non_time_drill_cols: String = drilldown_list_helper(growth_table_alias, &non_time_drills); + let growth_sql = format!("SELECT *, \ + coalesce(final_m{measure_idx} - (lag(final_m{measure_idx}) OVER w), 0) as growth_value, \ + coalesce(((final_m{measure_idx} - (lag(final_m{measure_idx}) OVER w)) / (lag(final_m{measure_idx}::float) OVER w)), 0) as growth_pct \ + FROM ({0}) {growth_table_alias} \ + WINDOW w as (PARTITION BY {drilldowns_ex_time} ORDER BY {growth_table_alias}.{time_col})", + final_sql.to_string(), + measure_idx=0, + drilldowns_ex_time=final_non_time_drill_cols, + growth_table_alias=growth_table_alias, + time_col=growth.time_drill.col_alias_only_string()); + (growth_sql, "".to_string()) +} diff --git a/tesseract-postgres/src/sql/options.rs b/tesseract-postgres/src/sql/options.rs new file mode 100644 index 00000000..69107f56 --- /dev/null +++ b/tesseract-postgres/src/sql/options.rs @@ -0,0 +1,83 @@ +use itertools::join; + +use super::{ + LimitSql, + SortSql, + TopSql, + TopWhereSql, + FilterSql, +}; + +use tesseract_core::query_ir::{DrilldownSql}; + +pub fn wrap_options( + final_sql: String, + drills: &[DrilldownSql], + top: &Option, + top_where: &Option, + sort: &Option, + limit: &Option, + filters: &[FilterSql], +) -> String +{ + let mut final_sql = final_sql; + + // TODO: top query not yet supported + + let final_drill_cols = drills.iter().map(|drill| { + let tmp: Vec = drill.col_alias_only_vec().iter().map(|alias| format!("options_subquery.{}", alias)).collect(); + return join(tmp, ", "); + }); + let final_drill_cols = join(final_drill_cols, ", "); + + + // There's a final wrapper clause no matter what. + // - it sorts by final_drill_cols + // - unless there's a specific sort, which just goes to head of cols + // - or if there's a top, sort by the by_dim col. + // - limits + let limit_sql = { + if let Some(limit) = limit { + if let Some(offset) = limit.offset { + format!("LIMIT {} OFFSET {}", offset, limit.n) + } else { + format!("LIMIT {}", limit.n) + } + } else { + "".to_string() + } + }; + + let sort_sql = { + if let Some(sort) = sort { + format!("ORDER BY {} {}, {}", + sort.column, + sort.direction.sql_string(), + final_drill_cols + ) + } + else { + // by default dont sort unless user asks + "".to_string() + } + }; + + let filters_sql = if !filters.is_empty() { + let filter_clauses = filters.iter() + .map(|f| format!("{} {}", f.by_column, f.constraint.sql_string())); + format!("WHERE {}", join(filter_clauses, " AND ")) + } else { + "".into() + }; + + + final_sql = format!("SELECT * FROM ({}) options_subquery {} {} {}", + final_sql, + filters_sql, + sort_sql, + limit_sql, + ); + + final_sql +} + diff --git a/tesseract-postgres/src/sql/primary_agg.rs b/tesseract-postgres/src/sql/primary_agg.rs new file mode 100644 index 00000000..6b12f539 --- /dev/null +++ b/tesseract-postgres/src/sql/primary_agg.rs @@ -0,0 +1,78 @@ +use itertools::join; +//use super::Aggregator; +extern crate tesseract_core; +use tesseract_core::Aggregator; + +use super::{ + TableSql, + CutSql, + DrilldownSql, + MeasureSql, + dim_subquery, +}; + +/// Error checking is done before this point. This string formatter +/// accepts any input +pub fn primary_agg( + table: &TableSql, + cuts: &[CutSql], + drills: &[DrilldownSql], + meas: &[MeasureSql], +) -> String +{ + // hack for now... remove later + fn agg_sql_string(measure_idx: usize, m: &MeasureSql) -> String { + match &m.aggregator { + Aggregator::Sum => format!("sum({}) as final_m{}", &m.column, measure_idx), + Aggregator::Count => format!("count({}) as final_m{}", &m.column, measure_idx), + Aggregator::Average => format!("avg({}) as final_m{}", &m.column, measure_idx), + // median doesn't work like this + Aggregator::Median => format!("median"), + Aggregator::WeightedAverage {..} => format!("avg"), + Aggregator::Moe {..} => format!(""), + Aggregator::WeightedAverageMoe {..} => format!(""), + Aggregator::Custom(s) => format!("{}", s), + } + } + + // -------------------------------------------------- + // copied from primary_agg for clickhouse + let ext_drills: Vec<_> = drills.iter() + .filter(|d| d.table.name != table.name) + .collect(); + + let drill_cols_w_aliases = join(drills.iter().map(|d| d.col_alias_string2()), ", "); + let drill_cols = join(drills.iter().map(|d| d.col_qual_string()), ", "); + let mea_cols = join(meas.iter().enumerate().map(|(idx, m)| agg_sql_string(idx,m)), ", "); + + let mut final_sql = format!("select {}, {} from {}", + drill_cols_w_aliases, + mea_cols, + table.name, + ); + + // join external dims + if !ext_drills.is_empty() { + let join_ext_dim_clauses = join(ext_drills.iter() + .map(|d| { + format!("inner join {} on {}.{} = {}.{}", + d.table.full_name(), + d.table.full_name(), + d.primary_key, + table.name, + d.foreign_key, + ) + }), " "); // note for postgres do not use commas! + + final_sql = format!("{} {}", final_sql, join_ext_dim_clauses); + } + + if !cuts.is_empty() { + let cut_clauses = join(cuts.iter().map(|c| format!("{} in ({})", c.col_qual_string(), c.members_string())), " and "); + final_sql = format!("{} where {}", final_sql, cut_clauses); + } + + final_sql = format!("{} group by {}", final_sql, drill_cols); // remove semicolon + + final_sql +}