diff --git a/src/common/base/src/base/watch_notify.rs b/src/common/base/src/base/watch_notify.rs index be05dfc9028c0..61d0528a94c77 100644 --- a/src/common/base/src/base/watch_notify.rs +++ b/src/common/base/src/base/watch_notify.rs @@ -41,6 +41,16 @@ impl WatchNotify { let _ = rx.changed().await; } + pub fn has_notified(&self) -> bool { + match self.rx.has_changed() { + Ok(b) => b, + Err(_) => { + // The sender has never dropped before + unreachable!() + } + } + } + pub fn notify_waiters(&self) { let _ = self.tx.send_replace(true); } @@ -61,11 +71,18 @@ mod tests { #[tokio::test] async fn test_notify_waiters_ahead() { let notify = WatchNotify::new(); + assert!(!notify.has_notified()); + let notified1 = notify.notified(); + assert!(!notify.has_notified()); + // notify_waiters ahead of notified being instantiated and awaited notify.notify_waiters(); - + assert!(notify.has_notified()); // this should not await indefinitely - let notified = notify.notified(); - notified.await; + let notified2 = notify.notified(); + notified2.await; + + notified1.await; + assert!(notify.has_notified()); } } diff --git a/src/query/expression/src/types/array.rs b/src/query/expression/src/types/array.rs index d3097c439716a..214e89f3a453f 100755 --- a/src/query/expression/src/types/array.rs +++ b/src/query/expression/src/types/array.rs @@ -52,7 +52,7 @@ impl ValueType for ArrayType { scalar.clone() } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Array(array) => T::try_downcast_column(array), _ => None, diff --git a/src/query/expression/src/types/boolean.rs b/src/query/expression/src/types/boolean.rs index 35ba609bd888e..10ae5fac3717d 100644 --- a/src/query/expression/src/types/boolean.rs +++ b/src/query/expression/src/types/boolean.rs @@ -48,7 +48,7 @@ impl ValueType for BooleanType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Boolean(scalar) => Some(*scalar), _ => None, diff --git a/src/query/expression/src/types/date.rs b/src/query/expression/src/types/date.rs index 61b13bbea8a83..c720f3d3ce463 100644 --- a/src/query/expression/src/types/date.rs +++ b/src/query/expression/src/types/date.rs @@ -78,7 +78,7 @@ impl ValueType for DateType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Date(scalar) => Some(*scalar), _ => None, diff --git a/src/query/expression/src/types/decimal.rs b/src/query/expression/src/types/decimal.rs index 04ba37c64ea67..33de3db6c5e12 100644 --- a/src/query/expression/src/types/decimal.rs +++ b/src/query/expression/src/types/decimal.rs @@ -72,7 +72,7 @@ impl ValueType for DecimalType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { Num::try_downcast_scalar(scalar.as_decimal()?) } diff --git a/src/query/expression/src/types/empty_array.rs b/src/query/expression/src/types/empty_array.rs index 3f365ed94d055..353a468c3284d 100644 --- a/src/query/expression/src/types/empty_array.rs +++ b/src/query/expression/src/types/empty_array.rs @@ -45,7 +45,7 @@ impl ValueType for EmptyArrayType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::EmptyArray => Some(()), _ => None, diff --git a/src/query/expression/src/types/empty_map.rs b/src/query/expression/src/types/empty_map.rs index 7e7c9bf730ae1..df2a151d8ee42 100644 --- a/src/query/expression/src/types/empty_map.rs +++ b/src/query/expression/src/types/empty_map.rs @@ -45,7 +45,7 @@ impl ValueType for EmptyMapType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::EmptyMap => Some(()), _ => None, diff --git a/src/query/expression/src/types/interval.rs b/src/query/expression/src/types/interval.rs index 4c5a032614277..7e6ca9c70d25e 100644 --- a/src/query/expression/src/types/interval.rs +++ b/src/query/expression/src/types/interval.rs @@ -52,7 +52,7 @@ impl ValueType for IntervalType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Interval(scalar) => Some(*scalar), _ => None, diff --git a/src/query/expression/src/types/map.rs b/src/query/expression/src/types/map.rs index bb4677b3a9098..edb575efe1306 100755 --- a/src/query/expression/src/types/map.rs +++ b/src/query/expression/src/types/map.rs @@ -341,7 +341,7 @@ impl ValueType for MapType { as ValueType>::to_scalar_ref(scalar) } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Map(array) => KvPair::::try_downcast_column(array), _ => None, diff --git a/src/query/expression/src/types/null.rs b/src/query/expression/src/types/null.rs index ac05707dbc238..e2806fa2e2987 100644 --- a/src/query/expression/src/types/null.rs +++ b/src/query/expression/src/types/null.rs @@ -46,7 +46,7 @@ impl ValueType for NullType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Null => Some(()), _ => None, diff --git a/src/query/expression/src/types/number.rs b/src/query/expression/src/types/number.rs index a98afe84ed333..5879e035c1738 100644 --- a/src/query/expression/src/types/number.rs +++ b/src/query/expression/src/types/number.rs @@ -117,7 +117,7 @@ impl ValueType for NumberType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { Num::try_downcast_scalar(scalar.as_number()?) } diff --git a/src/query/expression/src/types/timestamp.rs b/src/query/expression/src/types/timestamp.rs index 687d1140451b0..72661e918ad0d 100644 --- a/src/query/expression/src/types/timestamp.rs +++ b/src/query/expression/src/types/timestamp.rs @@ -83,7 +83,7 @@ impl ValueType for TimestampType { *scalar } - fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option> { + fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option> { match scalar { ScalarRef::Timestamp(scalar) => Some(*scalar), _ => None, diff --git a/src/query/pipeline/transforms/src/processors/transforms/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/mod.rs index ec6ca0faf96a0..8fe951ce2c89a 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/mod.rs @@ -41,7 +41,7 @@ pub use transform_compact_builder::*; pub use transform_compact_no_split_builder::*; pub use transform_dummy::*; pub use transform_k_way_merge_sort::*; -pub use transform_multi_sort_merge::try_add_multi_sort_merge; +pub use transform_multi_sort_merge::*; pub use transform_pipeline_helper::TransformPipelineHelper; pub use transform_retry_async::*; pub use transform_sort_merge::sort_merge; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs index 3af2c413e7f28..7584378015106 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs @@ -60,6 +60,17 @@ impl Rows for BinaryColumn { fn slice(&self, range: Range) -> Self { self.slice(range) } + + fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> { + match s { + Scalar::Binary(s) => s, + _ => unreachable!(), + } + } + + fn owned_item(item: Self::Item<'_>) -> Scalar { + Scalar::Binary(Vec::from(item)) + } } impl RowConverter for CommonRowConverter { diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index 479d2559c049e..5eb5036aec6c5 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -17,6 +17,7 @@ mod simple; mod utils; use std::fmt::Debug; +use std::ops::Range; pub use common::*; use databend_common_exception::ErrorCode; @@ -25,7 +26,9 @@ use databend_common_expression::types::ArgType; use databend_common_expression::types::DataType; use databend_common_expression::BlockEntry; use databend_common_expression::Column; +use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; +use databend_common_expression::Scalar; use databend_common_expression::SortColumnDescription; pub use simple::*; pub use utils::*; @@ -39,6 +42,18 @@ where Self: Sized + Debug output_schema: DataSchemaRef, ) -> Result; fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result; + + fn convert_data_block( + &self, + sort_desc: &[SortColumnDescription], + data_block: &DataBlock, + ) -> Result { + let order_by_cols = sort_desc + .iter() + .map(|desc| data_block.get_by_offset(desc.offset).clone()) + .collect::>(); + self.convert(&order_by_cols, data_block.num_rows()) + } } /// Rows can be compared. @@ -82,5 +97,9 @@ where Self: Sized + Clone + Debug + Send self.row(self.len() - 1) } - fn slice(&self, range: std::ops::Range) -> Self; + fn slice(&self, range: Range) -> Self; + + fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a>; + + fn owned_item(item: Self::Item<'_>) -> Scalar; } diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs index 0c7b45ab0268d..046dfa1b753c5 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs @@ -24,6 +24,7 @@ use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::DataSchemaRef; +use databend_common_expression::Scalar; use databend_common_expression::SortColumnDescription; use databend_common_expression::Value; @@ -70,6 +71,15 @@ where inner: T::slice_column(&self.inner, range), } } + + fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> { + let s = &s.as_ref(); + T::try_downcast_scalar(s).unwrap() + } + + fn owned_item(item: Self::Item<'_>) -> Scalar { + T::upcast_scalar(T::to_owned_scalar(item)) + } } /// Rows structure for single simple types. (numbers, date, timestamp) @@ -113,6 +123,15 @@ where inner: T::slice_column(&self.inner, range), } } + + fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> { + let s = &s.as_ref(); + Reverse(T::try_downcast_scalar(s).unwrap()) + } + + fn owned_item(item: Self::Item<'_>) -> Scalar { + T::upcast_scalar(T::to_owned_scalar(item.0)) + } } /// If there is only one sort field and its type is a primitive type, diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs index da1a63bf2deeb..373e2e2e99281 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs @@ -21,7 +21,6 @@ use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::TimestampType; use databend_common_expression::with_number_mapped_type; -use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; @@ -41,116 +40,125 @@ pub fn convert_rows( sort_desc: &[SortColumnDescription], data: DataBlock, ) -> Result { - let num_rows = data.num_rows(); - - if sort_desc.len() == 1 { - let sort_type = schema.field(sort_desc[0].offset).data_type(); - let asc = sort_desc[0].asc; - - let offset = sort_desc[0].offset; - let columns = &data.columns()[offset..offset + 1]; - - match_template! { - T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], - match sort_type { - DataType::T => { - if asc { - convert_columns::,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) - } else { - convert_columns::,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) - } - }, - DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty { - NumberDataType::NUM_TYPE => { - if asc { - convert_columns::>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) - } else { - convert_columns::>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) - } - } - }), - _ => convert_columns::(schema, sort_desc, columns, num_rows), - } + struct ConvertRowsVisitor<'a> { + schema: DataSchemaRef, + sort_desc: &'a [SortColumnDescription], + data: DataBlock, + } + + impl RowsTypeVisitor for ConvertRowsVisitor<'_> { + type Result = Result; + fn schema(&self) -> DataSchemaRef { + self.schema.clone() + } + + fn sort_desc(&self) -> &[SortColumnDescription] { + self.sort_desc + } + + fn visit_type(&mut self) -> Self::Result + where + R: Rows + 'static, + C: RowConverter + Send + 'static, + { + let columns = self + .sort_desc + .iter() + .map(|desc| self.data.get_by_offset(desc.offset).to_owned()) + .collect::>(); + + let converter = C::create(self.sort_desc, self.schema.clone())?; + let rows = C::convert(&converter, &columns, self.data.num_rows())?; + Ok(rows.to_column()) } - } else { - let columns = sort_desc - .iter() - .map(|desc| data.get_by_offset(desc.offset).to_owned()) - .collect::>(); - convert_columns::(schema, sort_desc, &columns, num_rows) } -} -fn convert_columns>( - schema: DataSchemaRef, - sort_desc: &[SortColumnDescription], - columns: &[BlockEntry], - num_rows: usize, -) -> Result { - let converter = C::create(sort_desc, schema)?; - let rows = C::convert(&converter, columns, num_rows)?; - Ok(rows.to_column()) + let mut visitor = ConvertRowsVisitor { + schema: schema.clone(), + sort_desc, + data, + }; + + select_row_type(&mut visitor) } -pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) { - let sort_desc = visitor.sort_desc(); - if sort_desc.len() == 1 { - let schema = visitor.schema(); - let sort_type = schema.field(sort_desc[0].offset).data_type(); - let asc = sort_desc[0].asc; - - match_template! { - T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], - match sort_type { - DataType::T => { - if asc { - visitor.visit_type::, SimpleRowConverter>() - } else { - visitor.visit_type::, SimpleRowConverter>() - } - }, - DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty { - NumberDataType::NUM_TYPE => { +pub fn select_row_type(visitor: &mut V) -> V::Result +where V: RowsTypeVisitor { + match &visitor.sort_desc() { + &[desc] => { + let schema = visitor.schema(); + let sort_type = schema.field(desc.offset).data_type(); + let asc = desc.asc; + + match_template! { + T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], + match sort_type { + DataType::T => { if asc { - visitor.visit_type::>, SimpleRowConverter>>() + visitor.visit_type::, SimpleRowConverter>() } else { - visitor.visit_type::>, SimpleRowConverter>>() + visitor.visit_type::, SimpleRowConverter>() + } + }, + DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty { + NumberDataType::NUM_TYPE => { + if asc { + visitor.visit_type::>, SimpleRowConverter>>() + } else { + visitor.visit_type::>, SimpleRowConverter>>() + } } + }), + _ => visitor.visit_type::() } - }), - _ => visitor.visit_type::() } } - } else { - visitor.visit_type::() + _ => visitor.visit_type::(), } } pub trait RowsTypeVisitor { + type Result; fn schema(&self) -> DataSchemaRef; fn sort_desc(&self) -> &[SortColumnDescription]; - fn visit_type(&mut self) + fn visit_type(&mut self) -> Self::Result where R: Rows + 'static, C: RowConverter + Send + 'static; } pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> DataType { - debug_assert!(!desc.is_empty()); - if desc.len() == 1 { - let order_by_field = schema.field(desc[0].offset); - if matches!( - order_by_field.data_type(), - DataType::Number(_) - | DataType::Date - | DataType::Timestamp - | DataType::Binary - | DataType::String - ) { - return order_by_field.data_type().clone(); + struct OrderFieldTypeVisitor<'a> { + schema: DataSchemaRef, + sort_desc: &'a [SortColumnDescription], + } + + impl RowsTypeVisitor for OrderFieldTypeVisitor<'_> { + type Result = DataType; + fn schema(&self) -> DataSchemaRef { + self.schema.clone() + } + + fn sort_desc(&self) -> &[SortColumnDescription] { + self.sort_desc + } + + fn visit_type(&mut self) -> Self::Result + where + R: Rows + 'static, + C: RowConverter + Send + 'static, + { + R::data_type() } } - DataType::Binary + + assert!(!desc.is_empty()); + let mut visitor = OrderFieldTypeVisitor { + schema: schema.clone().into(), + sort_desc: desc, + }; + + select_row_type(&mut visitor) } diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index fe7f8b72356c8..902cb1e66b088 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -33,6 +33,10 @@ use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; use crate::pipelines::memory_settings::MemorySettingsExt; +use crate::pipelines::processors::transforms::add_range_shuffle_route; +use crate::pipelines::processors::transforms::SortRangeExchange; +use crate::pipelines::processors::transforms::SortSampleState; +use crate::pipelines::processors::transforms::TransformLimit; use crate::pipelines::processors::transforms::TransformSortBuilder; use crate::pipelines::PipelineBuilder; use crate::sessions::QueryContext; @@ -133,9 +137,17 @@ impl PipelineBuilder { None => { // Build for single node mode. // We build the full sort pipeline for it. - builder - .remove_order_col_at_last() - .build_full_sort_pipeline(&mut self.main_pipeline) + if self.settings.get_enable_range_shuffle_sort()? + && self.main_pipeline.output_len() > 1 + { + builder + .remove_order_col_at_last() + .build_range_shuffle_sort_pipeline(&mut self.main_pipeline) + } else { + builder + .remove_order_col_at_last() + .build_full_sort_pipeline(&mut self.main_pipeline) + } } } } @@ -148,6 +160,7 @@ pub struct SortPipelineBuilder { limit: Option, block_size: usize, remove_order_col_at_last: bool, + enable_loser_tree: bool, } impl SortPipelineBuilder { @@ -156,7 +169,9 @@ impl SortPipelineBuilder { schema: DataSchemaRef, sort_desc: Arc<[SortColumnDescription]>, ) -> Result { - let block_size = ctx.get_settings().get_max_block_size()? as usize; + let settings = ctx.get_settings(); + let block_size = settings.get_max_block_size()? as usize; + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; Ok(Self { ctx, schema, @@ -164,6 +179,7 @@ impl SortPipelineBuilder { limit: None, block_size, remove_order_col_at_last: false, + enable_loser_tree, }) } @@ -195,11 +211,77 @@ impl SortPipelineBuilder { self.build_merge_sort_pipeline(pipeline, false) } - pub fn build_merge_sort_pipeline( - self, - pipeline: &mut Pipeline, - order_col_generated: bool, - ) -> Result<()> { + fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline) -> Result<()> { + let inputs = pipeline.output_len(); + let settings = self.ctx.get_settings(); + let num_exec = inputs; + let max_block_size = settings.get_max_block_size()? as usize; + + // Partial sort + pipeline.add_transformer(|| { + TransformSortPartial::new( + LimitType::from_limit_rows(self.limit), + self.sort_desc.clone(), + ) + }); + + let spiller = { + let location_prefix = self.ctx.query_id_spill_prefix(); + let config = SpillerConfig { + spiller_type: SpillerType::OrderBy, + location_prefix, + disk_spill: None, + use_parquet: settings.get_spilling_file_format()?.is_parquet(), + }; + let op = DataOperator::instance().spill_operator(); + Arc::new(Spiller::create(self.ctx.clone(), op, config)?) + }; + + let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?; + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + + let builder = TransformSortBuilder::create( + self.schema.clone(), + self.sort_desc.clone(), + max_block_size, + spiller, + ) + .with_limit(self.limit) + .with_order_col_generated(false) + .with_output_order_col(false) + .with_memory_settings(memory_settings) + .with_enable_loser_tree(enable_loser_tree); + + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(builder.build_collect(input, output)?)) + })?; + + let state = SortSampleState::new(inputs, num_exec, builder.inner_schema(), max_block_size); + + builder.add_shuffle(pipeline, state.clone())?; + + pipeline.exchange(num_exec, Arc::new(SortRangeExchange)); + + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(builder.build_combine(input, output)?)) + })?; + + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(builder.build_exec(input, output)?)) + })?; + + add_range_shuffle_route(pipeline)?; + + if self.limit.is_none() { + return Ok(()); + } + + pipeline.add_transform(|input, output| { + TransformLimit::try_create(self.limit, 0, input, output).map(ProcessorPtr::create) + }) + } + + fn build_merge_sort(&self, pipeline: &mut Pipeline, order_col_generated: bool) -> Result<()> { // Merge sort let need_multi_merge = pipeline.output_len() > 1; let output_order_col = need_multi_merge || !self.remove_order_col_at_last; @@ -228,13 +310,11 @@ impl SortPipelineBuilder { use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; let op = DataOperator::instance().spill_operator(); - Arc::new(Spiller::create(self.ctx.clone(), op, config.clone())?) + Arc::new(Spiller::create(self.ctx.clone(), op, config)?) }; pipeline.add_transform(|input, output| { let builder = TransformSortBuilder::create( - input, - output, sort_merge_output_schema.clone(), self.sort_desc.clone(), self.block_size, @@ -246,8 +326,17 @@ impl SortPipelineBuilder { .with_memory_settings(memory_settings.clone()) .with_enable_loser_tree(enable_loser_tree); - Ok(ProcessorPtr::create(builder.build()?)) - })?; + Ok(ProcessorPtr::create(builder.build(input, output)?)) + }) + } + + pub fn build_merge_sort_pipeline( + self, + pipeline: &mut Pipeline, + order_col_generated: bool, + ) -> Result<()> { + let need_multi_merge = pipeline.output_len() > 1; + self.build_merge_sort(pipeline, order_col_generated)?; if !need_multi_merge { return Ok(()); @@ -259,9 +348,8 @@ impl SortPipelineBuilder { pub fn build_multi_merge(self, pipeline: &mut Pipeline) -> Result<()> { // Multi-pipelines merge sort let settings = self.ctx.get_settings(); - let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; - let max_threads = settings.get_max_threads()? as usize; if settings.get_enable_parallel_multi_merge_sort()? { + let max_threads = settings.get_max_threads()? as usize; add_k_way_merge_sort( pipeline, self.schema.clone(), @@ -270,7 +358,7 @@ impl SortPipelineBuilder { self.limit, self.sort_desc, self.remove_order_col_at_last, - enable_loser_tree, + self.enable_loser_tree, ) } else { try_add_multi_sort_merge( @@ -280,7 +368,7 @@ impl SortPipelineBuilder { self.limit, self.sort_desc, self.remove_order_col_at_last, - enable_loser_tree, + self.enable_loser_tree, ) } } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index c7b2c27272aeb..4059501a4ff9a 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -789,7 +789,7 @@ impl RunningGraph { true => Ok(()), false => Err(ErrorCode::Internal(format!( "Pipeline graph is not finished, details: {}", - self.format_graph_nodes() + self.format_graph_nodes(true) ))), } } @@ -855,7 +855,7 @@ impl RunningGraph { self.0.finished_notify.clone() } - pub fn format_graph_nodes(&self) -> String { + pub fn format_graph_nodes(&self, pretty: bool) -> String { pub struct NodeDisplay { id: usize, name: String, @@ -955,7 +955,11 @@ impl RunningGraph { } } - format!("{:?}", nodes_display) + if pretty { + format!("{:#?}", nodes_display) + } else { + format!("{:?}", nodes_display) + } } /// Change the priority diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 7fef8013bbb51..c0a05425144a4 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -268,7 +268,7 @@ impl PipelineExecutor { pub fn format_graph_nodes(&self) -> String { match self { PipelineExecutor::QueryPipelineExecutor(executor) => executor.format_graph_nodes(), - PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(), + PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(false), } } diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 844a1d8316fd7..94b7d3b353c6e 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -431,7 +431,7 @@ impl QueryPipelineExecutor { } pub fn format_graph_nodes(&self) -> String { - self.graph.format_graph_nodes() + self.graph.format_graph_nodes(false) } pub fn fetch_plans_profile(&self, collect_metrics: bool) -> HashMap { diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index da8b50455878f..546ca3a9595ab 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -16,6 +16,7 @@ pub mod aggregator; mod hash_join; pub(crate) mod range_join; mod runtime_pool; +pub mod sort; mod transform_add_computed_columns; mod transform_add_const_columns; mod transform_add_internal_columns; @@ -29,7 +30,6 @@ mod transform_expression_scan; mod transform_filter; mod transform_limit; mod transform_merge_block; -mod transform_merge_sort; mod transform_null_if; mod transform_recursive_cte_scan; mod transform_recursive_cte_source; @@ -41,6 +41,7 @@ mod transform_udf_server; mod window; pub use hash_join::*; +pub use sort::*; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; @@ -55,7 +56,6 @@ pub use transform_expression_scan::TransformExpressionScan; pub use transform_filter::TransformFilter; pub use transform_limit::TransformLimit; pub use transform_merge_block::TransformMergeBlock; -pub use transform_merge_sort::*; pub use transform_null_if::TransformNullIf; pub use transform_recursive_cte_scan::TransformRecursiveCteScan; pub use transform_recursive_cte_source::TransformRecursiveCteSource; diff --git a/src/query/service/src/pipelines/processors/transforms/sort/bounds.rs b/src/query/service/src/pipelines/processors/transforms/sort/bounds.rs new file mode 100644 index 0000000000000..6f20fb9faf1ef --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/bounds.rs @@ -0,0 +1,316 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; +use databend_common_expression::Scalar; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_transforms::sort::LoserTreeMerger; +use databend_common_pipeline_transforms::sort::Rows; +use databend_common_pipeline_transforms::sort::SortedStream; + +#[derive(Debug, PartialEq, Eq, Default, Clone)] +pub struct Bounds( + // stored in reverse order of Column. + Vec, +); + +impl Bounds { + pub fn new_unchecked(column: Column) -> Bounds { + Bounds(vec![column]) + } + + pub fn from_column(column: Column) -> Result { + let block = DataBlock::sort( + &DataBlock::new_from_columns(vec![column]), + &[SortColumnDescription { + offset: 0, + asc: R::IS_ASC_COLUMN, + nulls_first: false, + }], + None, + )?; + + Ok(Bounds(vec![block.get_last_column().clone()])) + } + + pub fn merge(mut vector: Vec, batch_rows: usize) -> Result { + match vector.len() { + 0 => Ok(Bounds(vec![])), + 1 => Ok(vector.pop().unwrap()), + _ => { + let schema = DataSchema::new(vec![DataField::new("order_col", R::data_type())]); + let mut merger = + LoserTreeMerger::::create(schema.into(), vector, batch_rows, None); + + let mut blocks = Vec::new(); + while let Some(block) = merger.next_block()? { + blocks.push(block) + } + debug_assert!(merger.is_finished()); + + Ok(Bounds( + blocks + .iter() + .rev() + .map(|b| b.get_last_column().clone()) + .collect(), + )) + } + } + } + + pub fn next_bound(&mut self) -> Option { + let last = self.0.last_mut()?; + match last.len() { + 0 => unreachable!(), + 1 => { + let bound = last.index(0).unwrap().to_owned(); + self.0.pop(); + Some(bound) + } + _ => { + let bound = last.index(0).unwrap().to_owned(); + *last = last.slice(1..last.len()); + Some(bound) + } + } + } + + pub fn len(&self) -> usize { + self.0.iter().map(Column::len).sum() + } + + pub fn is_empty(&self) -> bool { + self.0.iter().all(|col| col.len() == 0) + } + + pub fn reduce(&self, n: usize, data_type: DataType) -> Option { + if n == 0 { + return Some(Self::default()); + } + let total = self.len(); + if n >= total { + return None; + } + + let step = total / n; + let offset = step / 2; + let indices = self + .0 + .iter() + .enumerate() + .rev() + .flat_map(|(b_idx, col)| std::iter::repeat_n(b_idx, col.len()).zip(0..col.len())) + .enumerate() + .take(step * n) + .filter_map(|(i, (block, row))| { + if i % step == offset { + Some((block as u32, row as u32, 1)) + } else { + None + } + }) + .collect::>(); + + Some(Bounds(vec![Column::take_column_indices( + &self.0, + data_type, + &indices, + indices.len(), + )])) + } + + pub fn dedup_reduce(&self, n: usize) -> Self { + if n == 0 { + return Self::default(); + } + let total = self.len(); + let mut step = total as f64 / n as f64; + let mut target = step / 2.0; + let mut indices = Vec::with_capacity(n); + let mut last: Option<(R, _)> = None; + for (i, (b_idx, r_idx)) in self + .0 + .iter() + .enumerate() + .rev() + .flat_map(|(b_idx, col)| std::iter::repeat_n(b_idx, col.len()).zip(0..col.len())) + .enumerate() + { + if indices.len() >= n { + break; + } + if (i as f64) < target { + continue; + } + + let cur_rows = R::from_column(&self.0[b_idx]).unwrap(); + if last + .as_ref() + .map(|(last_rows, last_idx)| cur_rows.row(r_idx) == last_rows.row(*last_idx)) + .unwrap_or_default() + { + continue; + } + + indices.push((b_idx as u32, r_idx as u32, 1)); + target += step; + if (i as f64) > target && indices.len() < n { + step = (total - i) as f64 / (n - indices.len()) as f64; + target = i as f64 + step / 2.0; + } + last = Some((cur_rows, r_idx)); + } + + Bounds(vec![Column::take_column_indices( + &self.0, + R::data_type(), + &indices, + indices.len(), + )]) + } +} + +impl SortedStream for Bounds { + fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { + match self.0.pop() { + Some(column) => Ok(( + Some((DataBlock::new_from_columns(vec![column.clone()]), column)), + false, + )), + None => Ok((None, false)), + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_expression::types::ArgType; + use databend_common_expression::types::Int32Type; + use databend_common_expression::FromData; + use databend_common_pipeline_transforms::sort::SimpleRowsAsc; + use databend_common_pipeline_transforms::sort::SimpleRowsDesc; + + use super::*; + + fn int32_columns(data: T) -> Vec + where T: IntoIterator> { + data.into_iter().map(Int32Type::from_data).collect() + } + + #[test] + fn test_merge() -> Result<()> { + { + let column = Int32Type::from_data(vec![0, 7, 6, 6, 6]); + let bounds = Bounds::from_column::>(column)?; + assert_eq!( + bounds, + Bounds(vec![Int32Type::from_data(vec![0, 6, 6, 6, 7])]) + ); + + let vector = vec![ + bounds, + Bounds::default(), + Bounds::from_column::>(Int32Type::from_data(vec![ + 0, 1, 2, + ])) + .unwrap(), + ]; + let bounds = Bounds::merge::>(vector, 3)?; + + assert_eq!( + bounds, + Bounds(int32_columns([vec![6, 7], vec![2, 6, 6], vec![0, 0, 1]])) + ); + } + + { + let data = [vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]]; + let data = data + .into_iter() + .map(|v| Bounds::from_column::>(Int32Type::from_data(v))) + .collect::>>()?; + let bounds = Bounds::merge::>(data, 2)?; + + assert_eq!( + bounds, + Bounds(int32_columns([ + vec![-2], + vec![1, 1], + vec![3, 2], + vec![7, 6], + vec![77, 8] + ])) + ); + } + + Ok(()) + } + + #[test] + fn test_reduce() -> Result<()> { + let data = vec![vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]]; + + let data = data + .into_iter() + .map(|v| Bounds::from_column::>(Int32Type::from_data(v))) + .collect::>>()?; + let bounds = Bounds::merge::>(data, 2)?; + + let got = bounds.reduce(4, Int32Type::data_type()).unwrap(); + assert_eq!(got, Bounds(int32_columns([vec![8, 6, 2, 1]]))); // 77 _8 7 _6 3 _2 1 _1 -2 + + let got = bounds.reduce(3, Int32Type::data_type()).unwrap(); + assert_eq!(got, Bounds(int32_columns([vec![8, 3, 1]]))); // 77 _8 7 6 _3 2 1 _1 -2 + + let got = bounds.reduce(2, Int32Type::data_type()).unwrap(); + assert_eq!(got, Bounds(int32_columns([vec![7, 1]]))); // 77 8 _7 6 3 2 _1 1 -2 + + let got = bounds.reduce(1, Int32Type::data_type()).unwrap(); + assert_eq!(got, Bounds(int32_columns([vec![3]]))); // 77 8 7 6 _3 2 1 1 -2 + + Ok(()) + } + + #[test] + fn test_dedup_reduce() -> Result<()> { + let column = Int32Type::from_data(vec![1, 2, 2, 3, 3, 3, 4, 5, 5]); + let bounds = Bounds::new_unchecked(column); + let reduced = bounds.dedup_reduce::>(3); + assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 5]]))); + + let column = Int32Type::from_data(vec![5, 5, 4, 3, 3, 3, 2, 2, 1]); + let bounds = Bounds::new_unchecked(column); + let reduced = bounds.dedup_reduce::>(3); + assert_eq!(reduced, Bounds(int32_columns([vec![4, 3, 1]]))); + + let bounds = Bounds(int32_columns([vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![ + 1, 2, 2, 3, + ]])); + let reduced = bounds.dedup_reduce::>(5); + assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 4, 6, 7]]))); + + let column = Int32Type::from_data(vec![1, 1, 1, 1, 1]); + let bounds = Bounds(vec![column]); + let reduced = bounds.dedup_reduce::>(3); + assert_eq!(reduced, Bounds(int32_columns([vec![1]]))); + + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort.rs b/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs similarity index 90% rename from src/query/service/src/pipelines/processors/transforms/transform_merge_sort.rs rename to src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs index 413e7fa1c3fd7..ac0abeec46ec0 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs @@ -37,16 +37,13 @@ use databend_common_pipeline_transforms::MergeSort; use databend_common_pipeline_transforms::SortSpillParams; use databend_common_pipeline_transforms::TransformSortMergeLimit; +use super::sort_spill::create_memory_merger; +use super::sort_spill::MemoryMerger; +use super::sort_spill::SortSpill; +use super::Base; +use super::MemoryRows; use crate::spillers::Spiller; -mod sort_spill; -use sort_spill::create_memory_merger; -use sort_spill::MemoryMerger; -use sort_spill::SortSpill; - -mod builder; -pub use builder::TransformSortBuilder; - #[derive(Debug)] enum State { /// This state means the processor will collect incoming blocks. @@ -57,14 +54,6 @@ enum State { Finish, } -#[derive(Clone)] -struct Base { - schema: DataSchemaRef, - spiller: Arc, - sort_row_offset: usize, - limit: Option, -} - enum Inner { Collect(Vec), Limit(TransformSortMergeLimit), @@ -104,8 +93,7 @@ where A: SortAlgorithm, C: RowConverter, { - #[allow(clippy::too_many_arguments)] - fn new( + pub(super) fn new( input: Arc, output: Arc, schema: DataSchemaRef, @@ -152,14 +140,9 @@ where } fn generate_order_column(&self, mut block: DataBlock) -> Result<(A::Rows, DataBlock)> { - let order_by_cols = self - .sort_desc - .iter() - .map(|desc| block.get_by_offset(desc.offset).clone()) - .collect::>(); let rows = self .row_converter - .convert(&order_by_cols, block.num_rows())?; + .convert_data_block(&self.sort_desc, &block)?; let order_col = rows.to_column(); block.add_column(BlockEntry { data_type: order_col.data_type(), @@ -168,7 +151,7 @@ where Ok((rows, block)) } - fn prepare_spill_limit(&mut self) -> Result<()> { + fn limit_trans_to_spill(&mut self) -> Result<()> { let Inner::Limit(merger) = &self.inner else { unreachable!() }; @@ -182,7 +165,7 @@ where Ok(()) } - fn prepare_spill(&mut self, input_data: Vec) { + fn collect_trans_to_spill(&mut self, input_data: Vec) { let (num_rows, num_bytes) = input_data .iter() .map(|block| (block.num_rows(), block.memory_size())) @@ -194,11 +177,28 @@ where self.inner = Inner::Spill(input_data, spill_sort); } + fn trans_to_spill(&mut self) -> Result<()> { + match &mut self.inner { + Inner::Limit(_) => self.limit_trans_to_spill(), + Inner::Collect(input_data) => { + let input_data = std::mem::take(input_data); + self.collect_trans_to_spill(input_data); + Ok(()) + } + Inner::Spill(_, _) => Ok(()), + Inner::Memory(_) => unreachable!(), + } + } + fn determine_params(&self, bytes: usize, rows: usize) -> SortSpillParams { // We use the first memory calculation to estimate the batch size and the number of merge. let unit_size = self.memory_settings.spill_unit_size; let num_merge = bytes.div_ceil(unit_size).max(2); let batch_rows = rows.div_ceil(num_merge); + + /// The memory will be doubled during merging. + const MERGE_RATIO: usize = 2; + let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2); log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}"); SortSpillParams { batch_rows, @@ -307,16 +307,6 @@ where } } -trait MemoryRows { - fn in_memory_rows(&self) -> usize; -} - -impl MemoryRows for Vec { - fn in_memory_rows(&self) -> usize { - self.iter().map(|s| s.num_rows()).sum::() - } -} - #[async_trait::async_trait] impl Processor for TransformSort where @@ -363,6 +353,7 @@ where return match self.state { State::Collect => { if self.check_spill() { + // delay the handle of input until the next call. Ok(Event::Async) } else { Ok(Event::Sync) @@ -433,18 +424,7 @@ where match &self.state { State::Collect => { let finished = self.input.is_finished(); - match &mut self.inner { - Inner::Limit(_) => { - self.prepare_spill_limit()?; - } - Inner::Collect(input_data) => { - debug_assert!(!finished); - let input_data = std::mem::take(input_data); - self.prepare_spill(input_data); - } - Inner::Spill(_, _) => (), - Inner::Memory(_) => unreachable!(), - }; + self.trans_to_spill()?; let input = self.input_rows(); let Inner::Spill(input_data, spill_sort) = &mut self.inner else { @@ -455,7 +435,7 @@ where if memory_rows > 0 && memory_rows + input > max { spill_sort - .subsequent_spill_last(memory_rows + input - max) + .collect_spill_last(memory_rows + input - max) .await?; } if input > max || finished && input > 0 { @@ -470,7 +450,7 @@ where let Inner::Spill(input_data, spill_sort) = &mut self.inner else { unreachable!() }; - debug_assert!(input_data.is_empty()); + assert!(input_data.is_empty()); let (block, finish) = spill_sort.on_restore().await?; self.output_data.extend(block); if finish { diff --git a/src/query/service/src/pipelines/processors/transforms/sort/mod.rs b/src/query/service/src/pipelines/processors/transforms/sort/mod.rs new file mode 100644 index 0000000000000..3d7c57e31beb0 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/mod.rs @@ -0,0 +1,79 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use bounds::Bounds; +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_pipeline_transforms::SortSpillParams; +use sort_spill::SpillableBlock; + +use crate::spillers::Spiller; + +mod bounds; +mod merge_sort; +mod sort_builder; +mod sort_collect; +mod sort_combine; +mod sort_exchange; +mod sort_execute; +mod sort_route; +mod sort_shuffle; +mod sort_spill; + +pub use sort_builder::*; +pub use sort_exchange::*; +pub use sort_route::*; +pub use sort_shuffle::*; + +#[derive(Clone)] +struct Base { + schema: DataSchemaRef, + spiller: Arc, + sort_row_offset: usize, + limit: Option, +} + +#[derive(Debug)] +struct SortCollectedMeta { + params: SortSpillParams, + bounds: Bounds, + blocks: Vec>, +} + +local_block_meta_serde!(SortCollectedMeta); + +#[typetag::serde(name = "sort_collected")] +impl BlockMetaInfo for SortCollectedMeta {} + +#[derive(Debug)] +struct SortScatteredMeta(pub Vec>); + +local_block_meta_serde!(SortScatteredMeta); + +#[typetag::serde(name = "sort_scattered")] +impl BlockMetaInfo for SortScatteredMeta {} + +trait MemoryRows { + fn in_memory_rows(&self) -> usize; +} + +impl MemoryRows for Vec { + fn in_memory_rows(&self) -> usize { + self.iter().map(|s| s.num_rows()).sum::() + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs new file mode 100644 index 0000000000000..4bfbe9f9646ff --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs @@ -0,0 +1,361 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; +use databend_common_pipeline_transforms::sort::algorithm::HeapSort; +use databend_common_pipeline_transforms::sort::algorithm::LoserTreeSort; +use databend_common_pipeline_transforms::sort::select_row_type; +use databend_common_pipeline_transforms::sort::utils::add_order_field; +use databend_common_pipeline_transforms::sort::utils::ORDER_COL_NAME; +use databend_common_pipeline_transforms::sort::RowConverter; +use databend_common_pipeline_transforms::sort::Rows; +use databend_common_pipeline_transforms::sort::RowsTypeVisitor; +use databend_common_pipeline_transforms::AccumulatingTransformer; +use databend_common_pipeline_transforms::MemorySettings; + +use super::merge_sort::TransformSort; +use super::sort_collect::TransformSortCollect; +use super::sort_combine::TransformSortCombine; +use super::sort_execute::TransformSortExecute; +use super::sort_shuffle::SortSampleState; +use super::sort_shuffle::TransformSortShuffle; +use super::Base; +use crate::spillers::Spiller; + +enum SortType { + Sort, + Collect, + Execute, + Shuffle, + Combine, +} + +pub struct TransformSortBuilder { + schema: DataSchemaRef, + block_size: usize, + sort_desc: Arc<[SortColumnDescription]>, + order_col_generated: bool, + output_order_col: bool, + memory_settings: MemorySettings, + spiller: Arc, + enable_loser_tree: bool, + limit: Option, +} + +impl TransformSortBuilder { + pub fn create( + schema: DataSchemaRef, + sort_desc: Arc<[SortColumnDescription]>, + block_size: usize, + spiller: Arc, + ) -> Self { + TransformSortBuilder { + block_size, + schema, + sort_desc, + spiller, + order_col_generated: false, + output_order_col: false, + enable_loser_tree: false, + limit: None, + memory_settings: MemorySettings::disable_spill(), + } + } + + pub fn with_order_col_generated(mut self, order_col_generated: bool) -> Self { + self.order_col_generated = order_col_generated; + self + } + + pub fn with_output_order_col(mut self, output_order_col: bool) -> Self { + self.output_order_col = output_order_col; + self + } + + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + pub fn with_memory_settings(mut self, memory_settings: MemorySettings) -> Self { + self.memory_settings = memory_settings; + self + } + + pub fn with_enable_loser_tree(mut self, enable_loser_tree: bool) -> Self { + self.enable_loser_tree = enable_loser_tree; + self + } + + pub fn build( + &self, + input: Arc, + output: Arc, + ) -> Result> { + self.check(); + + let mut build = Build { + params: self, + input, + output, + typ: SortType::Sort, + id: 0, + state: None, + }; + + select_row_type(&mut build) + } + + pub fn build_collect( + &self, + input: Arc, + output: Arc, + ) -> Result> { + self.check(); + + let mut build = Build { + params: self, + input, + output, + typ: SortType::Collect, + id: 0, + state: None, + }; + + select_row_type(&mut build) + } + + pub fn build_exec( + &self, + input: Arc, + output: Arc, + ) -> Result> { + self.check(); + + let mut build = Build { + params: self, + input, + output, + typ: SortType::Execute, + id: 0, + state: None, + }; + + select_row_type(&mut build) + } + + pub fn build_shuffle( + &self, + input: Arc, + output: Arc, + id: usize, + state: Arc, + ) -> Result> { + self.check(); + + let mut build = Build { + params: self, + input, + output, + typ: SortType::Shuffle, + id, + state: Some(state), + }; + + select_row_type(&mut build) + } + + pub fn build_combine( + &self, + input: Arc, + output: Arc, + ) -> Result> { + self.check(); + + let mut build = Build { + params: self, + input, + output, + typ: SortType::Combine, + id: 0, + state: None, + }; + + select_row_type(&mut build) + } + + fn should_use_sort_limit(&self) -> bool { + self.limit.map(|limit| limit < 10000).unwrap_or_default() + } + + fn check(&self) { + assert!(if self.output_order_col { + self.schema.has_field(ORDER_COL_NAME) + } else { + !self.schema.has_field(ORDER_COL_NAME) + }); + } + + fn new_base(&self) -> Base { + let schema = self.inner_schema(); + let sort_row_offset = schema.fields().len() - 1; + Base { + sort_row_offset, + schema, + spiller: self.spiller.clone(), + limit: self.limit, + } + } + + pub fn inner_schema(&self) -> DataSchemaRef { + add_order_field(self.schema.clone(), &self.sort_desc) + } + + pub fn add_shuffle(&self, pipeline: &mut Pipeline, state: Arc) -> Result<()> { + use std::sync::atomic; + let i = atomic::AtomicUsize::new(0); + pipeline.add_transform(|input, output| { + let id = i.fetch_add(1, atomic::Ordering::AcqRel); + Ok(ProcessorPtr::create(self.build_shuffle( + input, + output, + id, + state.clone(), + )?)) + }) + } +} + +struct Build<'a> { + params: &'a TransformSortBuilder, + typ: SortType, + input: Arc, + output: Arc, + id: usize, + state: Option>, +} + +impl Build<'_> { + fn build_sort(&mut self, limit_sort: bool) -> Result> + where + A: SortAlgorithm + 'static, + C: RowConverter + Send + 'static, + { + let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc); + Ok(Box::new(TransformSort::::new( + self.input.clone(), + self.output.clone(), + schema, + self.params.sort_desc.clone(), + self.params.block_size, + self.params.limit.map(|limit| (limit, limit_sort)), + self.params.spiller.clone(), + self.params.output_order_col, + self.params.order_col_generated, + self.params.memory_settings.clone(), + )?)) + } + + fn build_sort_collect(&mut self, limit_sort: bool) -> Result> + where + A: SortAlgorithm + 'static, + C: RowConverter + Send + 'static, + { + Ok(Box::new(TransformSortCollect::::new( + self.input.clone(), + self.output.clone(), + self.params.new_base(), + self.params.sort_desc.clone(), + self.params.block_size, + limit_sort, + self.params.order_col_generated, + self.params.memory_settings.clone(), + )?)) + } + + fn build_sort_exec(&mut self) -> Result> + where A: SortAlgorithm + 'static { + Ok(Box::new(TransformSortExecute::::new( + self.input.clone(), + self.output.clone(), + self.params.new_base(), + self.params.output_order_col, + )?)) + } + + fn build_sort_shuffle(&mut self) -> Result> + where R: Rows + 'static { + Ok(Box::new(TransformSortShuffle::::new( + self.input.clone(), + self.output.clone(), + self.id, + self.state.clone().unwrap(), + self.params.spiller.clone(), + ))) + } + + fn build_sort_combine(&mut self) -> Result> + where R: Rows + 'static { + Ok(AccumulatingTransformer::create( + self.input.clone(), + self.output.clone(), + TransformSortCombine::::new(self.params.block_size), + )) + } +} + +impl RowsTypeVisitor for Build<'_> { + type Result = Result>; + fn schema(&self) -> DataSchemaRef { + self.params.schema.clone() + } + + fn sort_desc(&self) -> &[SortColumnDescription] { + &self.params.sort_desc + } + + fn visit_type(&mut self) -> Self::Result + where + R: Rows + 'static, + C: RowConverter + Send + 'static, + { + let limit_sort = self.params.should_use_sort_limit(); + match self.typ { + SortType::Sort => match self.params.enable_loser_tree { + true => self.build_sort::, C>(limit_sort), + false => self.build_sort::, C>(limit_sort), + }, + SortType::Collect => match self.params.enable_loser_tree { + true => self.build_sort_collect::, C>(limit_sort), + false => self.build_sort_collect::, C>(limit_sort), + }, + SortType::Execute => match self.params.enable_loser_tree { + true => self.build_sort_exec::>(), + false => self.build_sort_exec::>(), + }, + SortType::Shuffle => self.build_sort_shuffle::(), + SortType::Combine => self.build_sort_combine::(), + } + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs new file mode 100644 index 0000000000000..dbf46ca817829 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs @@ -0,0 +1,362 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::atomic; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::SortColumnDescription; +use databend_common_expression::Value; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; +use databend_common_pipeline_transforms::sort::RowConverter; +use databend_common_pipeline_transforms::sort::Rows; +use databend_common_pipeline_transforms::MemorySettings; +use databend_common_pipeline_transforms::MergeSort; +use databend_common_pipeline_transforms::SortSpillParams; +use databend_common_pipeline_transforms::TransformSortMergeLimit; + +use super::sort_spill::SortSpill; +use super::Base; +use super::MemoryRows; + +enum Inner { + Collect(Vec), + Limit(TransformSortMergeLimit), + Spill(Vec, SortSpill), + None, +} + +pub struct TransformSortCollect { + name: &'static str, + input: Arc, + output: Arc, + output_data: Option, + + max_block_size: usize, + row_converter: C, + sort_desc: Arc<[SortColumnDescription]>, + /// If this transform is after an Exchange transform, + /// it means it will compact the data from cluster nodes. + /// And the order column is already generated in each cluster node, + /// so we don't need to generate the order column again. + order_col_generated: bool, + + base: Base, + inner: Inner, + + aborting: AtomicBool, + + memory_settings: MemorySettings, +} + +impl TransformSortCollect +where + A: SortAlgorithm, + C: RowConverter, +{ + pub(super) fn new( + input: Arc, + output: Arc, + base: Base, + sort_desc: Arc<[SortColumnDescription]>, + max_block_size: usize, + sort_limit: bool, + order_col_generated: bool, + memory_settings: MemorySettings, + ) -> Result { + let row_converter = C::create(&sort_desc, base.schema.clone())?; + let (name, inner) = match base.limit { + Some(limit) if sort_limit => ( + "TransformSortMergeCollectLimit", + Inner::Limit(TransformSortMergeLimit::create(max_block_size, limit)), + ), + _ => ("TransformSortMergeCollect", Inner::Collect(vec![])), + }; + Ok(Self { + input, + output, + name, + row_converter, + output_data: None, + sort_desc, + order_col_generated, + base, + inner, + aborting: AtomicBool::new(false), + memory_settings, + max_block_size, + }) + } + + fn generate_order_column(&self, mut block: DataBlock) -> Result<(A::Rows, DataBlock)> { + let rows = self + .row_converter + .convert_data_block(&self.sort_desc, &block)?; + let order_col = rows.to_column(); + block.add_column(BlockEntry { + data_type: order_col.data_type(), + value: Value::Column(order_col), + }); + Ok((rows, block)) + } + + fn limit_trans_to_spill(&mut self, no_spill: bool) -> Result<()> { + let Inner::Limit(merger) = &self.inner else { + unreachable!() + }; + assert!(merger.num_rows() > 0); + let params = if no_spill { + SortSpillParams { + batch_rows: self.max_block_size, + num_merge: merger.num_rows().div_ceil(self.max_block_size).max(2), + } + } else { + self.determine_params(merger.num_bytes(), merger.num_rows()) + }; + let Inner::Limit(merger) = &mut self.inner else { + unreachable!() + }; + let blocks = merger.prepare_spill(params.batch_rows)?; + let spill_sort = SortSpill::new(self.base.clone(), params); + self.inner = Inner::Spill(blocks, spill_sort); + Ok(()) + } + + fn collect_trans_to_spill(&mut self, input_data: Vec, no_spill: bool) { + let (num_rows, num_bytes) = input_data + .iter() + .map(|block| (block.num_rows(), block.memory_size())) + .fold((0, 0), |(acc_rows, acc_bytes), (rows, bytes)| { + (acc_rows + rows, acc_bytes + bytes) + }); + assert!(num_rows > 0); + let params = if no_spill { + SortSpillParams { + batch_rows: self.max_block_size, + num_merge: num_rows.div_ceil(self.max_block_size).max(2), + } + } else { + self.determine_params(num_bytes, num_rows) + }; + let spill_sort = SortSpill::new(self.base.clone(), params); + self.inner = Inner::Spill(input_data, spill_sort); + } + + fn trans_to_spill(&mut self, no_spill: bool) -> Result<()> { + match &mut self.inner { + Inner::Limit(_) => self.limit_trans_to_spill(no_spill), + Inner::Collect(input_data) => { + let input_data = std::mem::take(input_data); + self.collect_trans_to_spill(input_data, no_spill); + Ok(()) + } + Inner::Spill(_, _) => Ok(()), + Inner::None => unreachable!(), + } + } + + fn determine_params(&self, bytes: usize, rows: usize) -> SortSpillParams { + // We use the first memory calculation to estimate the batch size and the number of merge. + let unit_size = self.memory_settings.spill_unit_size; + let num_merge = bytes.div_ceil(unit_size).max(2); + let batch_rows = rows.div_ceil(num_merge); + + /// The memory will be doubled during merging. + const MERGE_RATIO: usize = 2; + let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2); + log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}"); + SortSpillParams { + batch_rows, + num_merge, + } + } + + fn collect_block(&mut self, block: DataBlock) -> Result<()> { + if self.order_col_generated { + return match &mut self.inner { + Inner::Limit(limit_sort) => { + let rows = A::Rows::from_column(block.get_last_column())?; + limit_sort.add_block(block, rows) + } + Inner::Collect(input_data) | Inner::Spill(input_data, _) => { + input_data.push(block); + Ok(()) + } + _ => unreachable!(), + }; + } + + let (rows, block) = self.generate_order_column(block)?; + match &mut self.inner { + Inner::Limit(limit_sort) => limit_sort.add_block(block, rows), + Inner::Collect(input_data) | Inner::Spill(input_data, _) => { + input_data.push(block); + Ok(()) + } + _ => unreachable!(), + } + } + + fn check_spill(&self) -> bool { + if !self.memory_settings.check_spill() { + return false; + } + + match &self.inner { + Inner::Limit(limit_sort) => { + limit_sort.num_bytes() > self.memory_settings.spill_unit_size * 2 + } + Inner::Collect(input_data) => { + input_data.iter().map(|b| b.memory_size()).sum::() + > self.memory_settings.spill_unit_size * 2 + } + Inner::Spill(input_data, sort_spill) => { + input_data.in_memory_rows() > sort_spill.max_rows() + } + _ => unreachable!(), + } + } + + fn create_output(&mut self) -> Result<()> { + let Inner::Spill(input_data, spill_sort) = std::mem::replace(&mut self.inner, Inner::None) + else { + unreachable!() + }; + assert!(input_data.is_empty()); + + let meta = spill_sort.dump_collect()?; + self.output_data = Some(DataBlock::empty_with_meta(Box::new(meta))); + Ok(()) + } +} + +#[async_trait::async_trait] +impl Processor for TransformSortCollect +where + A: SortAlgorithm + 'static, + A::Rows: 'static, + C: RowConverter + Send + 'static, +{ + fn name(&self) -> String { + self.name.to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if let Some(block) = self.output_data.take() { + assert!(self.input.is_finished()); + self.output.push_data(Ok(block)); + self.output.finish(); + return Ok(Event::Finished); + } + + if self.input.has_data() { + return if self.check_spill() { + // delay the handle of input until the next call. + Ok(Event::Async) + } else { + Ok(Event::Sync) + }; + } + + if self.input.is_finished() { + return match &self.inner { + Inner::Limit(merger) => { + if merger.num_rows() == 0 { + self.output.finish(); + Ok(Event::Finished) + } else { + Ok(Event::Async) + } + } + Inner::Collect(input_data) => { + if input_data.is_empty() { + self.output.finish(); + Ok(Event::Finished) + } else { + Ok(Event::Async) + } + } + Inner::Spill(_, _) => Ok(Event::Async), + Inner::None => unreachable!(), + }; + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(block) = self.input.pull_data().transpose()? { + self.input.set_need_data(); + if !block.is_empty() { + self.collect_block(block)?; + } + } + Ok(()) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + let finished = self.input.is_finished(); + self.trans_to_spill(finished)?; + + let Inner::Spill(input_data, spill_sort) = &mut self.inner else { + unreachable!() + }; + + let input = input_data.in_memory_rows(); + let memory_rows = spill_sort.collect_memory_rows(); + let max = spill_sort.max_rows(); + + if memory_rows > 0 && memory_rows + input > max { + spill_sort + .collect_spill_last(memory_rows + input - max) + .await?; + } + if input > max || finished && input > 0 { + spill_sort.sort_input_data(std::mem::take(input_data), &self.aborting)?; + } + if finished { + self.create_output() + } else { + Ok(()) + } + } + + fn interrupt(&self) { + self.aborting.store(true, atomic::Ordering::Release); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_combine.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_combine.rs new file mode 100644 index 0000000000000..538bd7aff390b --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_combine.rs @@ -0,0 +1,80 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_transforms::sort::Rows; +use databend_common_pipeline_transforms::AccumulatingTransform; + +use super::bounds::Bounds; +use super::SortCollectedMeta; + +pub struct TransformSortCombine { + batch_rows: usize, + metas: Vec, + _r: std::marker::PhantomData, +} + +impl TransformSortCombine { + pub fn new(batch_rows: usize) -> Self { + Self { + batch_rows, + metas: vec![], + _r: Default::default(), + } + } +} + +impl AccumulatingTransform for TransformSortCombine { + const NAME: &'static str = "TransformSortCombine"; + + fn transform(&mut self, mut data: DataBlock) -> Result> { + self.metas.push( + data.take_meta() + .and_then(SortCollectedMeta::downcast_from) + .expect("require a SortCollectedMeta"), + ); + Ok(vec![]) + } + + fn on_finish(&mut self, output: bool) -> Result> { + if !output || self.metas.is_empty() { + return Ok(vec![]); + } + + let params = self.metas.first().map(|meta| meta.params).unwrap(); + + let bounds = self + .metas + .iter_mut() + .map(|meta| std::mem::take(&mut meta.bounds)) + .collect(); + let bounds = Bounds::merge::(bounds, self.batch_rows)?; + + let blocks = self + .metas + .drain(..) + .flat_map(|meta| meta.blocks.into_iter()) + .collect(); + + Ok(vec![DataBlock::empty_with_meta(Box::new( + SortCollectedMeta { + params, + bounds, + blocks, + }, + ))]) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_exchange.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_exchange.rs new file mode 100644 index 0000000000000..700c5a3e0e81a --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_exchange.rs @@ -0,0 +1,44 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Exchange; + +use super::SortScatteredMeta; + +pub struct SortRangeExchange; + +impl Exchange for SortRangeExchange { + const NAME: &'static str = "SortRange"; + fn partition(&self, mut data: DataBlock, n: usize) -> Result> { + let scattered = data + .take_meta() + .and_then(SortScatteredMeta::downcast_from) + .expect("require a SortScatteredMeta") + .0; + assert!(scattered.len() <= n); + + let blocks = scattered + .into_iter() + .map(|meta| { + meta.map(|meta| DataBlock::empty_with_meta(Box::new(meta))) + .unwrap_or_else(DataBlock::empty) + }) + .collect(); + + Ok(blocks) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_execute.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_execute.rs new file mode 100644 index 0000000000000..044858dc4358a --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_execute.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; + +use super::sort_spill::SortSpill; +use super::Base; +use super::SortCollectedMeta; + +pub struct TransformSortExecute { + input: Arc, + output: Arc, + + /// If the next transform of current transform is [`super::transform_multi_sort_merge::MultiSortMergeProcessor`], + /// we can generate and output the order column to avoid the extra converting in the next transform. + remove_order_col: bool, + + base: Base, + inner: Option>, +} + +impl TransformSortExecute +where A: SortAlgorithm +{ + pub(super) fn new( + input: Arc, + output: Arc, + base: Base, + output_order_col: bool, + ) -> Result { + Ok(Self { + input, + output, + remove_order_col: !output_order_col, + base, + inner: None, + }) + } + + fn output_block(&self, mut block: DataBlock) { + if self.remove_order_col { + block.pop_columns(1); + } + self.output.push_data(Ok(block)); + } +} + +#[async_trait::async_trait] +impl Processor for TransformSortExecute +where + A: SortAlgorithm + 'static, + A::Rows: 'static, +{ + fn name(&self) -> String { + "TransformSortExecute".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if self.input.is_finished() && self.inner.is_none() { + self.output.finish(); + return Ok(Event::Finished); + } + + if let Some(mut block) = self.input.pull_data().transpose()? { + assert!(self.inner.is_none()); + let meta = block + .take_meta() + .and_then(SortCollectedMeta::downcast_from) + .expect("require a SortCollectedMeta"); + + self.inner = Some(SortSpill::::from_meta(self.base.clone(), meta)); + return Ok(Event::Async); + } + + if self.inner.is_some() { + return Ok(Event::Async); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + let Some(spill_sort) = &mut self.inner else { + unreachable!() + }; + let (block, finish) = spill_sort.on_restore().await?; + if let Some(block) = block { + assert!(!self.output.has_data()); + self.output_block(block); + } + if finish { + self.output.finish(); + self.inner = None; + } + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_route.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_route.rs new file mode 100644 index 0000000000000..a9592f4cd405a --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_route.rs @@ -0,0 +1,120 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipe; +use databend_common_pipeline_core::PipeItem; +use databend_common_pipeline_core::Pipeline; + +struct TransformSortRoute { + inputs: Vec>, + output: Arc, + cur_input: usize, +} + +impl TransformSortRoute { + fn new(inputs: Vec>, output: Arc) -> Self { + Self { + inputs, + output, + cur_input: 0, + } + } + + fn process(&mut self) -> Result<()> { + for (i, input) in self.inputs.iter().enumerate() { + if i != self.cur_input { + if !input.is_finished() && !input.has_data() { + input.set_need_data(); + } + continue; + } + + if input.is_finished() { + self.cur_input = i + 1; + continue; + } + + match input.pull_data() { + Some(data) => self.output.push_data(data), + None => input.set_need_data(), + } + } + + Ok(()) + } +} + +impl Processor for TransformSortRoute { + fn name(&self) -> String { + "SortRoute".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + for input in &self.inputs { + input.finish(); + } + return Ok(Event::Finished); + } + + if !self.output.can_push() { + for input in &self.inputs { + input.set_not_need_data(); + } + return Ok(Event::NeedConsume); + } + + self.process()?; + + if self.inputs.iter().all(|input| input.is_finished()) { + self.output.finish(); + return Ok(Event::Finished); + } + + Ok(Event::NeedData) + } +} + +pub fn add_range_shuffle_route(pipeline: &mut Pipeline) -> Result<()> { + let inputs = pipeline.output_len(); + let inputs_port = (0..inputs).map(|_| InputPort::create()).collect::>(); + let output = OutputPort::create(); + + let processor = ProcessorPtr::create(Box::new(TransformSortRoute::new( + inputs_port.clone(), + output.clone(), + ))); + + let pipe = Pipe::create(inputs, 1, vec![PipeItem::create( + processor, + inputs_port, + vec![output], + )]); + + pipeline.add_pipe(pipe); + Ok(()) +} diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs new file mode 100644 index 0000000000000..41c4ae5360132 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs @@ -0,0 +1,297 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::assert_matches::assert_matches; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::RwLock; + +use databend_common_base::base::WatchNotify; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_pipeline_transforms::processors::sort::Rows; + +use super::bounds::Bounds; +use super::Base; +use super::SortCollectedMeta; +use super::SortScatteredMeta; +use crate::pipelines::processors::Event; +use crate::pipelines::processors::InputPort; +use crate::pipelines::processors::OutputPort; +use crate::pipelines::processors::Processor; +use crate::spillers::Spiller; + +#[derive(Debug)] +enum Step { + None, + Meta(Box), + Scattered(Vec>), +} + +pub struct TransformSortShuffle { + input: Arc, + output: Arc, + id: usize, + step: Step, + state: Arc, + spiller: Arc, + _r: PhantomData, +} + +impl TransformSortShuffle { + pub fn new( + input: Arc, + output: Arc, + id: usize, + state: Arc, + spiller: Arc, + ) -> Self { + Self { + input, + output, + id, + state, + spiller, + step: Step::None, + _r: PhantomData, + } + } + + async fn scatter(&mut self) -> Result>> { + let SortCollectedMeta { + params, + bounds, + blocks, + } = match std::mem::replace(&mut self.step, Step::None) { + Step::None => { + return Ok(vec![]); + } + Step::Meta(box meta) => meta, + _ => unreachable!(), + }; + + let scatter_bounds = self.state.bounds(); + if scatter_bounds.is_empty() { + return Ok(vec![Some(SortCollectedMeta { + params, + bounds, + blocks, + })]); + } + + let base = { + let inner = self.state.inner.read().unwrap(); + Base { + schema: inner.schema.clone(), + spiller: self.spiller.clone(), + sort_row_offset: inner.schema.fields.len() - 1, + limit: None, + } + }; + + let mut scattered_blocks = std::iter::repeat_with(Vec::new) + .take(scatter_bounds.len() + 1) + .collect::>(); + for blocks in blocks { + let scattered = base + .scatter_stream::(Vec::from(blocks).into(), scatter_bounds.clone()) + .await?; + for (i, part) in scattered.into_iter().enumerate() { + if !part.is_empty() { + scattered_blocks[i].push(part.into_boxed_slice()); + } + } + } + + let scattered_meta = scattered_blocks + .into_iter() + .map(|blocks| { + (!blocks.is_empty()).then_some(SortCollectedMeta { + params, + bounds: bounds.clone(), + blocks, + }) + }) + .collect(); + Ok(scattered_meta) + } +} + +#[async_trait::async_trait] +impl Processor for TransformSortShuffle { + fn name(&self) -> String { + "TransformSortShuffle".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if matches!(self.step, Step::Scattered(_)) { + let Step::Scattered(scattered) = std::mem::replace(&mut self.step, Step::None) else { + unreachable!() + }; + + let data = DataBlock::empty_with_meta(Box::new(SortScatteredMeta(scattered))); + self.output.push_data(Ok(data)); + self.output.finish(); + return Ok(Event::Finished); + } + + if let Some(mut block) = self.input.pull_data().transpose()? { + assert_matches!(self.step, Step::None); + let meta = block + .take_meta() + .and_then(SortCollectedMeta::downcast_from) + .expect("require a SortCollectedMeta"); + + self.step = Step::Meta(Box::new(meta)); + return Ok(Event::Async); + } + + if self.input.is_finished() { + if self.state.done.has_notified() { + self.output.finish(); + Ok(Event::Finished) + } else { + Ok(Event::Async) + } + } else { + self.input.set_need_data(); + Ok(Event::NeedData) + } + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + let bounds = match &self.step { + Step::None if self.input.is_finished() => Bounds::default(), + Step::Meta(meta) => meta.generate_bounds(), + _ => unreachable!(), + }; + self.state.commit_sample::(self.id, bounds)?; + self.state.done.notified().await; + self.step = Step::Scattered(self.scatter().await?); + Ok(()) + } +} + +impl SortCollectedMeta { + fn generate_bounds(&self) -> Bounds { + if self.bounds.len() > 1 { + return self.bounds.clone(); + } + + let Some(blocks) = self.blocks.get(self.blocks.len() / 2) else { + return Bounds::default(); + }; + + blocks + .get(blocks.len() / 2) + .map(|block| match block.domain.len() { + 0 => Bounds::default(), + 1 => Bounds::new_unchecked(block.domain.clone()), + _ => Bounds::new_unchecked(block.domain.slice(0..1)), + }) + .unwrap_or_default() + } +} + +pub struct SortSampleState { + inner: RwLock, + pub(super) done: WatchNotify, +} + +impl SortSampleState { + pub fn new( + inputs: usize, + partitions: usize, + schema: DataSchemaRef, + batch_rows: usize, + ) -> Arc { + Arc::new(SortSampleState { + inner: RwLock::new(StateInner { + partitions, + schema, + partial: vec![None; inputs], + bounds: None, + batch_rows, + }), + done: WatchNotify::new(), + }) + } + + pub fn commit_sample(&self, id: usize, bounds: Bounds) -> Result { + let mut inner = self.inner.write().unwrap(); + + let x = inner.partial[id].replace(bounds); + assert!(x.is_none()); + let done = inner.partial.iter().all(Option::is_some); + if done { + inner.determine_bounds::()?; + self.done.notify_waiters(); + } + Ok(done) + } + + pub fn bounds(&self) -> Bounds { + self.inner + .read() + .unwrap() + .bounds + .clone() + .unwrap_or_default() + } +} + +struct StateInner { + // target partitions + partitions: usize, + schema: DataSchemaRef, + partial: Vec>, + bounds: Option, + batch_rows: usize, +} + +impl StateInner { + fn determine_bounds(&mut self) -> Result<()> { + let v = self.partial.drain(..).map(Option::unwrap).collect(); + let bounds = Bounds::merge::(v, self.batch_rows)?; + + let n = self.partitions - 1; + let bounds = if bounds.len() < n { + bounds + } else { + bounds.dedup_reduce::(n) + }; + assert!(bounds.len() < self.partitions); + + self.bounds = Some(bounds); + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort/sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs similarity index 55% rename from src/query/service/src/pipelines/processors/transforms/transform_merge_sort/sort_spill.rs rename to src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs index 629f0761121bd..440932b258da7 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort/sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs @@ -17,6 +17,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::intrinsics::unlikely; +use std::marker::PhantomData; use std::mem; use std::sync::atomic; use std::sync::atomic::AtomicBool; @@ -29,7 +30,7 @@ use databend_common_expression::sampler::FixedRateSampler; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; -use databend_common_expression::SortColumnDescription; +use databend_common_expression::Scalar; use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; use databend_common_pipeline_transforms::processors::sort::Merger; use databend_common_pipeline_transforms::processors::sort::Rows; @@ -38,12 +39,14 @@ use databend_common_pipeline_transforms::processors::SortSpillParams; use rand::rngs::StdRng; use rand::SeedableRng; +use super::bounds::Bounds; use super::Base; use super::MemoryRows; +use super::SortCollectedMeta; use crate::spillers::Location; use crate::spillers::Spiller; -pub struct SortSpill { +pub(super) struct SortSpill { base: Base, step: Step, } @@ -56,20 +59,21 @@ enum Step { struct StepCollect { params: SortSpillParams, sampler: FixedRateSampler, - streams: Vec>, + streams: Vec>>, } struct StepSort { params: SortSpillParams, - /// Partition boundaries for restoring and sorting blocks, stored in reverse order of Column. + /// Partition boundaries for restoring and sorting blocks. /// Each boundary represents a cutoff point where data less than or equal to it belongs to one partition. - bounds: Vec, - cur_bound: Option, + bounds: Bounds, + cur_bound: Option, - subsequent: Vec>, - current: Vec>, + subsequent: Vec>>, + current: Vec>>, - output_merger: Option>>, + #[expect(clippy::type_complexity)] + output_merger: Option>>>, } impl SortSpill @@ -92,6 +96,33 @@ where A: SortAlgorithm Self { base, step } } + pub fn from_meta(base: Base, meta: SortCollectedMeta) -> Self { + let SortCollectedMeta { + params, + bounds, + blocks, + } = meta; + + let subsequent = blocks + .into_iter() + .filter_map(|list| { + (!list.is_empty()).then(|| base.new_stream(Vec::from(list).into(), None)) + }) + .collect::>(); + debug_assert!(!subsequent.is_empty()); + Self { + base, + step: Step::Sort(StepSort { + params, + bounds, + cur_bound: None, + subsequent, + current: vec![], + output_merger: None, + }), + } + } + pub fn sort_input_data( &mut self, input_data: Vec, @@ -103,7 +134,7 @@ where A: SortAlgorithm collect.sort_input_data(&self.base, input_data, aborting) } - pub async fn subsequent_spill_last(&mut self, target_rows: usize) -> Result<()> { + pub async fn collect_spill_last(&mut self, target_rows: usize) -> Result<()> { let Step::Collect(collect) = &mut self.step else { unreachable!() }; @@ -151,10 +182,41 @@ where A: SortAlgorithm params.num_merge * params.batch_rows } - #[allow(unused)] + #[expect(unused)] pub fn format_memory_usage(&self) -> FmtMemoryUsage<'_, A> { FmtMemoryUsage(self) } + + pub fn dump_collect(self) -> Result { + let Self { + base, + step: Step::Collect(mut collect), + } = self + else { + unreachable!() + }; + + let StepSort { + params, + bounds, + subsequent, + .. + } = collect.next_step(&base)?; + + let blocks = subsequent + .into_iter() + .map(|stream| { + assert!(stream.bound.is_none()); + Vec::from(stream.blocks).into_boxed_slice() + }) + .collect(); + + Ok(SortCollectedMeta { + params, + blocks, + bounds, + }) + } } impl StepCollect { @@ -175,6 +237,8 @@ impl StepCollect { let data = input_data.pop().unwrap(); vec![base.new_block(data)].into() } else { + // todo: using multi-threaded cascade two-way merge sorting algorithm to obtain the best performance + // also see https://arxiv.org/pdf/1406.2628 let mut merger = create_memory_merger::( input_data, base.schema.clone(), @@ -239,20 +303,10 @@ impl StepCollect { impl StepSort { fn next_bound(&mut self) { - let Some(last) = self.bounds.last_mut() else { - self.cur_bound = None; - return; - }; - let bound = match last.len() { - 0 => unreachable!(), - 1 => self.bounds.pop().unwrap(), - _ => { - let bound = last.slice(0..1).maybe_gc(); - *last = last.slice(1..last.len()); - bound - } - }; - self.cur_bound = Some(A::Rows::from_column(&bound).unwrap()); + match self.bounds.next_bound() { + Some(bound) => self.cur_bound = Some(bound), + None => self.cur_bound = None, + } } async fn merge_current(&mut self, base: &Base) -> Result<()> { @@ -442,13 +496,15 @@ impl Base { fn new_stream( &self, blocks: VecDeque, - bound: Option, - ) -> BoundBlockStream { - BoundBlockStream:: { + bound: Option, + ) -> BoundBlockStream> { + assert!(!blocks.is_empty()); + BoundBlockStream { blocks, bound, sort_row_offset: self.sort_row_offset, spiller: self.spiller.clone(), + _r: Default::default(), } } @@ -460,58 +516,46 @@ impl Base { &self, sampled_rows: Vec, batch_rows: usize, - ) -> Result> { + ) -> Result { match sampled_rows.len() { - 0 => Ok(vec![]), - 1 => Ok(vec![DataBlock::sort( - &sampled_rows[0], - &[SortColumnDescription { - offset: 0, - asc: A::Rows::IS_ASC_COLUMN, - nulls_first: false, - }], - None, - )? - .get_last_column() - .clone()]), + 0 => Ok(Bounds::default()), + 1 => Bounds::from_column::(sampled_rows[0].get_last_column().clone()), _ => { - let streams = sampled_rows + let ls = sampled_rows .into_iter() .map(|data| { - let data = DataBlock::sort( - &data, - &[SortColumnDescription { - offset: 0, - asc: A::Rows::IS_ASC_COLUMN, - nulls_first: false, - }], - None, - ) - .unwrap(); - DataBlockStream::new(data, 0) + let col = data.get_last_column().clone(); + Bounds::from_column::(col) }) - .collect::>(); - - let schema = self.schema.project(&[self.sort_row_offset]); - let mut merger = Merger::::create(schema.into(), streams, batch_rows, None); - - let mut blocks = Vec::new(); - while let Some(block) = merger.next_block()? { - blocks.push(block) - } - debug_assert!(merger.is_finished()); + .collect::>>()?; + Bounds::merge::(ls, batch_rows) + } + } + } - Ok(blocks - .iter() - .rev() - .map(|b| b.get_last_column().clone()) - .collect::>()) + pub async fn scatter_stream( + &self, + mut blocks: VecDeque, + mut bounds: Bounds, + ) -> Result>> { + let mut scattered = Vec::with_capacity(bounds.len() + 1); + while !blocks.is_empty() { + let bound = bounds.next_bound(); + let mut stream = self.new_stream::(blocks, bound); + + let mut part = Vec::new(); + while let Some(block) = stream.take_next_bounded_spillable().await? { + part.push(block); } + + scattered.push(part); + blocks = stream.blocks; } + Ok(scattered) } } -impl MemoryRows for Vec> { +impl MemoryRows for Vec> { fn in_memory_rows(&self) -> usize { self.iter().map(|s| s.in_memory_rows()).sum::() } @@ -548,11 +592,11 @@ impl fmt::Debug for FmtMemoryUsage<'_, A> { } } -struct SpillableBlock { +pub struct SpillableBlock { data: Option, rows: usize, location: Option, - domain: Column, + pub(super) domain: Column, processed: usize, } @@ -586,10 +630,10 @@ impl SpillableBlock { R::from_column(&self.domain).unwrap() } - async fn spill(&mut self, spiller: &Spiller) -> Result<()> { + async fn spill(&mut self, spiller: &impl Spill) -> Result<()> { let data = self.data.take().unwrap(); if self.location.is_none() { - let location = spiller.spill(vec![data]).await?; + let location = spiller.spill(data).await?; self.location = Some(location); } Ok(()) @@ -615,15 +659,33 @@ fn sort_column(data: &DataBlock, sort_row_offset: usize) -> &Column { .unwrap() } +#[async_trait::async_trait] +pub trait Spill: Send { + async fn spill(&self, data_block: DataBlock) -> Result; + async fn restore(&self, location: &Location) -> Result; +} + +#[async_trait::async_trait] +impl Spill for Arc { + async fn spill(&self, data_block: DataBlock) -> Result { + self.as_ref().spill(vec![data_block]).await + } + + async fn restore(&self, location: &Location) -> Result { + self.read_spilled_file(location).await + } +} + /// BoundBlockStream is a stream of blocks that are cutoff less or equal than bound. -struct BoundBlockStream { +struct BoundBlockStream { blocks: VecDeque, - bound: Option, + bound: Option, sort_row_offset: usize, - spiller: Arc, + spiller: S, + _r: PhantomData, } -impl Debug for BoundBlockStream { +impl Debug for BoundBlockStream { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BoundBlockStream") .field("blocks", &self.blocks) @@ -634,7 +696,7 @@ impl Debug for BoundBlockStream { } #[async_trait::async_trait] -impl SortedStream for BoundBlockStream { +impl SortedStream for BoundBlockStream { async fn async_next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { if self.should_include_first() { self.restore_first().await?; @@ -647,15 +709,15 @@ impl SortedStream for BoundBlockStream { } } -impl BoundBlockStream { +impl BoundBlockStream { fn should_include_first(&self) -> bool { let Some(block) = self.blocks.front() else { return false; }; match &self.bound { - Some(bound) => block.domain::().first() <= bound.row(0), None => true, + Some(bound) => block.domain::().first() <= R::scalar_as_item(bound), } } @@ -666,7 +728,7 @@ impl BoundBlockStream { let block = self.blocks.front_mut().unwrap(); if let Some(pos) = - block_split_off_position(block.data.as_ref().unwrap(), bound, self.sort_row_offset) + block_split_off_position::(block.data.as_ref().unwrap(), bound, self.sort_row_offset) { block.slice(pos, self.sort_row_offset) } else { @@ -679,6 +741,23 @@ impl BoundBlockStream { block.data.take().unwrap() } + fn len(&self) -> usize { + self.blocks.len() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn in_memory_rows(&self) -> usize { + self.blocks + .iter() + .map(|b| if b.data.is_some() { b.rows } else { 0 }) + .sum() + } +} + +impl BoundBlockStream { async fn restore_first(&mut self) -> Result<()> { let block = self.blocks.front_mut().unwrap(); if block.data.is_some() { @@ -686,7 +765,7 @@ impl BoundBlockStream { } let location = block.location.as_ref().unwrap(); - let data = self.spiller.read_spilled_file(location).await?; + let data = self.spiller.restore(location).await?; block.data = Some(if block.processed != 0 { debug_assert_eq!(block.rows + block.processed, data.num_rows()); data.slice(block.processed..data.num_rows()) @@ -703,21 +782,6 @@ impl BoundBlockStream { Ok(()) } - fn len(&self) -> usize { - self.blocks.len() - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn in_memory_rows(&self) -> usize { - self.blocks - .iter() - .map(|b| if b.data.is_some() { b.rows } else { 0 }) - .sum() - } - async fn spill(&mut self, skip: usize) -> Result<()> { for b in &mut self .blocks @@ -729,17 +793,48 @@ impl BoundBlockStream { } Ok(()) } + + async fn take_next_bounded_spillable(&mut self) -> Result> { + let Some(bound) = &self.bound else { + return Ok(self.blocks.pop_front()); + }; + let Some(block) = self.blocks.front() else { + return Ok(None); + }; + { + let domain = block.domain::(); + let bound_item = R::scalar_as_item(bound); + if domain.first() > bound_item { + return Ok(None); + } + if domain.last() <= bound_item { + return Ok(self.blocks.pop_front()); + } + } + self.restore_first().await?; + + let block = self.blocks.front_mut().unwrap(); + if let Some(pos) = block_split_off_position::( + block.data.as_ref().unwrap(), + self.bound.as_ref().unwrap(), + self.sort_row_offset, + ) { + let data = block.slice(pos, self.sort_row_offset); + Ok(Some(SpillableBlock::new(data, self.sort_row_offset))) + } else { + Ok(self.blocks.pop_front()) + } + } } fn block_split_off_position( data: &DataBlock, - bound: &R, + bound: &Scalar, sort_row_offset: usize, ) -> Option { let rows = R::from_column(sort_column(data, sort_row_offset)).unwrap(); debug_assert!(rows.len() > 0); - debug_assert!(bound.len() == 1); - let bound = bound.row(0); + let bound = R::scalar_as_item(bound); partition_point(&rows, &bound) } @@ -773,7 +868,7 @@ impl SortedStream for DataBlockStream { } impl DataBlockStream { - fn new(data: DataBlock, sort_row_offset: usize) -> Self { + pub(super) fn new(data: DataBlock, sort_row_offset: usize) -> Self { let col = sort_column(&data, sort_row_offset).clone(); Self(Some((data, col))) } @@ -812,25 +907,28 @@ fn get_domain(col: &Column) -> Column { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::ops::Range; + use std::sync::Mutex; + + use databend_common_base::base::GlobalUniqName; use databend_common_expression::types::DataType; use databend_common_expression::types::Int32Type; use databend_common_expression::types::NumberDataType; + use databend_common_expression::types::NumberScalar; use databend_common_expression::types::StringType; use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::FromData; + use databend_common_expression::SortColumnDescription; use databend_common_expression::Value; use databend_common_pipeline_transforms::processors::sort::convert_rows; use databend_common_pipeline_transforms::processors::sort::SimpleRowsAsc; use databend_common_pipeline_transforms::sort::SimpleRowsDesc; - use databend_common_storage::DataOperator; use super::*; - use crate::spillers::SpillerConfig; - use crate::spillers::SpillerType; - use crate::test_kits::*; fn test_data() -> (DataSchemaRef, DataBlock) { let col1 = Int32Type::from_data(vec![7, 7, 8, 11, 3, 5, 10, 11]); @@ -847,15 +945,15 @@ mod tests { } async fn run_bound_block_stream( - spiller: Arc, - sort_desc: Arc>, - bound: Column, + spiller: impl Spill + Clone, + sort_desc: &[SortColumnDescription], + bound: Scalar, block_part: usize, want: Column, ) -> Result<()> { let (schema, block) = test_data(); - let block = DataBlock::sort(&block, &sort_desc, None)?; - let bound = Some(R::from_column(&bound)?); + let block = DataBlock::sort(&block, sort_desc, None)?; + let bound = Some(bound); let sort_row_offset = schema.fields().len(); let blocks = vec![ @@ -864,17 +962,18 @@ mod tests { ] .into_iter() .map(|mut data| { - let col = convert_rows(schema.clone(), &sort_desc, data.clone()).unwrap(); + let col = convert_rows(schema.clone(), sort_desc, data.clone()).unwrap(); data.add_column(BlockEntry::new(col.data_type(), Value::Column(col))); SpillableBlock::new(data, sort_row_offset) }) .collect::>(); - let mut stream = BoundBlockStream:: { + let mut stream = BoundBlockStream:: { blocks, bound, sort_row_offset, spiller: spiller.clone(), + _r: Default::default(), }; let data = stream.take_next_bounded_block(); @@ -886,29 +985,21 @@ mod tests { #[tokio::test] async fn test_bound_block_stream() -> Result<()> { - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - - let op = DataOperator::instance().spill_operator(); - let spill_config = SpillerConfig { - spiller_type: SpillerType::OrderBy, - location_prefix: "_spill_test".to_string(), - disk_spill: None, - use_parquet: true, + let spiller = MockSpiller { + map: Arc::new(Mutex::new(HashMap::new())), }; - let spiller = Arc::new(Spiller::create(ctx.clone(), op, spill_config)?); { - let sort_desc = Arc::new(vec![SortColumnDescription { + let sort_desc = [SortColumnDescription { offset: 0, asc: true, nulls_first: false, - }]); + }]; run_bound_block_stream::>( spiller.clone(), - sort_desc.clone(), - Int32Type::from_data(vec![5]), + &sort_desc, + Scalar::Number(NumberScalar::Int32(5)), 4, Int32Type::from_data(vec![3, 5]), ) @@ -916,8 +1007,8 @@ mod tests { run_bound_block_stream::>( spiller.clone(), - sort_desc.clone(), - Int32Type::from_data(vec![8]), + &sort_desc, + Scalar::Number(NumberScalar::Int32(8)), 4, Int32Type::from_data(vec![3, 5, 7, 7]), ) @@ -925,16 +1016,16 @@ mod tests { } { - let sort_desc = Arc::new(vec![SortColumnDescription { + let sort_desc = [SortColumnDescription { offset: 1, asc: false, nulls_first: false, - }]); + }]; run_bound_block_stream::>( spiller.clone(), - sort_desc.clone(), - StringType::from_data(vec!["f"]), + &sort_desc, + Scalar::String("f".to_string()), 4, StringType::from_data(vec!["w", "h", "g", "f"]), ) @@ -943,4 +1034,353 @@ mod tests { Ok(()) } + + fn create_spillable_block( + block: &DataBlock, + range: Range, + schema: &DataSchemaRef, + sort_desc: &[SortColumnDescription], + sort_row_offset: usize, + ) -> SpillableBlock { + let mut sliced_block = block.slice(range); + let col = convert_rows(schema.clone(), sort_desc, sliced_block.clone()).unwrap(); + sliced_block.add_column(BlockEntry::new(col.data_type(), Value::Column(col))); + SpillableBlock::new(sliced_block, sort_row_offset) + } + + async fn prepare_test_blocks( + spiller: &impl Spill, + sort_desc: &[SortColumnDescription], + with_spilled: bool, + with_sliced: bool, + ) -> Result<(DataSchemaRef, VecDeque, usize)> { + let (schema, block) = test_data(); + let block = DataBlock::sort(&block, sort_desc, None)?; + let sort_row_offset = schema.fields().len(); + + // Create multiple blocks with different splits + let mut blocks = VecDeque::new(); + + // First block: 0..2 + blocks.push_back(create_spillable_block( + &block, + 0..2, + &schema, + sort_desc, + sort_row_offset, + )); + + // Second block: 2..5 + blocks.push_back(create_spillable_block( + &block, + 2..5, + &schema, + sort_desc, + sort_row_offset, + )); + + // Spill some blocks if requested + if with_spilled { + // Spill the second block + blocks[1].spill(spiller).await?; + } + + if !with_sliced { + // Third block: 5..8 + blocks.push_back(create_spillable_block( + &block, + 5..8, + &schema, + sort_desc, + sort_row_offset, + )); + } else { + // Create a block for values 8..11 (the last part of the sorted data) + let mut spillable_block = + create_spillable_block(&block, 5..8, &schema, sort_desc, sort_row_offset); + + spillable_block.spill(spiller).await?; + spillable_block.data = Some( + spiller + .restore(spillable_block.location.as_ref().unwrap()) + .await?, + ); + + let sliced_data = spillable_block.slice(1, sort_row_offset); + let sliced_block = SpillableBlock::new(sliced_data, sort_row_offset); + + // Add both blocks to maintain the order + blocks.push_back(sliced_block); + blocks.push_back(spillable_block); + } + + Ok((schema, blocks, sort_row_offset)) + } + + async fn collect_and_verify_blocks( + stream: &mut BoundBlockStream, + spiller: &impl Spill, + expected_blocks: &[Column], + ) -> Result<()> { + let mut result_blocks = Vec::new(); + while let Some(mut block) = stream.take_next_bounded_spillable().await? { + // If the block data is None (spilled), restore it first + if block.data.is_none() { + block.data = Some(spiller.restore(block.location.as_ref().unwrap()).await?); + } + + let data = block.data.unwrap(); + let col = sort_column(&data, stream.sort_row_offset).clone(); + result_blocks.push(col); + } + + assert_eq!( + expected_blocks.len(), + result_blocks.len(), + "Number of blocks doesn't match" + ); + for (expected, actual) in expected_blocks.iter().zip(result_blocks.iter()) { + assert_eq!(expected, actual, "Block content doesn't match"); + } + + Ok(()) + } + + async fn run_take_next_bounded_spillable( + spiller: impl Spill + Clone, + sort_desc: &[SortColumnDescription], + bound: Option, + expected_blocks: Vec, + with_spilled: bool, + with_sliced: bool, + ) -> Result<()> { + let (_, blocks, sort_row_offset) = + prepare_test_blocks(&spiller, sort_desc, with_spilled, with_sliced).await?; + + let mut stream = BoundBlockStream:: { + blocks, + bound, + sort_row_offset, + spiller: spiller.clone(), + _r: Default::default(), + }; + + collect_and_verify_blocks(&mut stream, &spiller, &expected_blocks).await + } + + #[tokio::test] + async fn test_take_next_bounded_spillable() -> Result<()> { + let spiller = MockSpiller { + map: Arc::new(Mutex::new(HashMap::new())), + }; + + // Test with ascending Int32 type + { + let sort_desc = [SortColumnDescription { + offset: 0, + asc: true, + nulls_first: false, + }]; + + // Test 1: Basic test with bound = 5 (should return blocks with values <= 5) + // No spilled blocks, no sliced blocks + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(5))), + vec![Int32Type::from_data(vec![3, 5])], + false, + false, + ) + .await?; + + // Test 2: With spilled blocks, bound = 8 (should return blocks with values <= 8) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(8))), + vec![ + Int32Type::from_data(vec![3, 5]), + Int32Type::from_data(vec![7, 7, 8]), + ], + true, + false, + ) + .await?; + + // Test 3: With sliced blocks, bound = 7 (should return blocks with values <= 7) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(7))), + vec![ + Int32Type::from_data(vec![3, 5]), + Int32Type::from_data(vec![7, 7]), + ], + false, + true, + ) + .await?; + + // Test 4: With both spilled and sliced blocks, bound = 10 + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(10))), + vec![ + Int32Type::from_data(vec![3, 5]), + Int32Type::from_data(vec![7, 7, 8]), + Int32Type::from_data(vec![10]), + ], + true, + true, + ) + .await?; + + // Test 5: With bound = 2 (should return no blocks as all values > 2) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(2))), + vec![], + true, + true, + ) + .await?; + + // Test 6: With bound = 12 (should return all blocks as all values <= 12) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::Number(NumberScalar::Int32(12))), + vec![ + Int32Type::from_data(vec![3, 5]), + Int32Type::from_data(vec![7, 7, 8]), + Int32Type::from_data(vec![10, 11, 11]), + ], + true, + false, + ) + .await?; + + // Test 7: With no bound (should return all blocks) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + None, + vec![ + Int32Type::from_data(vec![3, 5]), + Int32Type::from_data(vec![7, 7, 8]), + Int32Type::from_data(vec![10, 11, 11]), + ], + true, + false, + ) + .await?; + } + + // Test with descending String type + { + let sort_desc = [SortColumnDescription { + offset: 1, + asc: false, + nulls_first: false, + }]; + + // Test 8: With bound = "f" (should return blocks with values >= "f") + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::String("f".to_string())), + vec![ + StringType::from_data(vec!["w", "h"]), + StringType::from_data(vec!["g", "f"]), + ], + false, + false, + ) + .await?; + + // Test 9: With spilled blocks, bound = "e" (should return blocks with values >= "e") + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::String("e".to_string())), + vec![ + StringType::from_data(vec!["w", "h"]), + StringType::from_data(vec!["g", "f", "e"]), + StringType::from_data(vec!["e"]), + ], + true, + false, + ) + .await?; + + // Test 10: With sliced blocks, bound = "d" (should return blocks with values >= "d") + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::String("d".to_string())), + vec![ + StringType::from_data(vec!["w", "h"]), + StringType::from_data(vec!["g", "f", "e"]), + StringType::from_data(vec!["e"]), + StringType::from_data(vec!["d", "d"]), + ], + false, + true, + ) + .await?; + + // Test 11: With both spilled and sliced blocks, bound = "c" (should return all blocks) + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::String("c".to_string())), + vec![ + StringType::from_data(vec!["w", "h"]), + StringType::from_data(vec!["g", "f", "e"]), + StringType::from_data(vec!["e"]), + StringType::from_data(vec!["d", "d"]), + ], + true, + true, + ) + .await?; + + // Test 12: With bound = "z" (should return no blocks as all values < "z") + run_take_next_bounded_spillable::>( + spiller.clone(), + &sort_desc, + Some(Scalar::String("z".to_string())), + vec![], + true, + true, + ) + .await?; + } + + Ok(()) + } + + #[derive(Clone)] + struct MockSpiller { + map: Arc>>, + } + + #[async_trait::async_trait] + impl Spill for MockSpiller { + async fn spill(&self, data_block: DataBlock) -> Result { + let name = GlobalUniqName::unique(); + self.map.lock().unwrap().insert(name.clone(), data_block); + Ok(Location::Remote(name)) + } + + async fn restore(&self, location: &Location) -> Result { + match location { + Location::Remote(name) => Ok(self.map.lock().unwrap().get(name).unwrap().clone()), + _ => unreachable!(), + } + } + } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort/builder.rs b/src/query/service/src/pipelines/processors/transforms/transform_merge_sort/builder.rs deleted file mode 100644 index 313e69d4412c6..0000000000000 --- a/src/query/service/src/pipelines/processors/transforms/transform_merge_sort/builder.rs +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_exception::Result; -use databend_common_expression::DataSchemaRef; -use databend_common_expression::SortColumnDescription; -use databend_common_pipeline_core::processors::InputPort; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; -use databend_common_pipeline_transforms::sort::algorithm::HeapSort; -use databend_common_pipeline_transforms::sort::algorithm::LoserTreeSort; -use databend_common_pipeline_transforms::sort::select_row_type; -use databend_common_pipeline_transforms::sort::utils::add_order_field; -use databend_common_pipeline_transforms::sort::utils::ORDER_COL_NAME; -use databend_common_pipeline_transforms::sort::RowConverter; -use databend_common_pipeline_transforms::sort::Rows; -use databend_common_pipeline_transforms::sort::RowsTypeVisitor; -use databend_common_pipeline_transforms::MemorySettings; - -use super::TransformSort; -use crate::spillers::Spiller; - -pub struct TransformSortBuilder { - input: Arc, - output: Arc, - schema: DataSchemaRef, - block_size: usize, - sort_desc: Arc<[SortColumnDescription]>, - order_col_generated: bool, - output_order_col: bool, - memory_settings: MemorySettings, - spiller: Arc, - enable_loser_tree: bool, - limit: Option, - processor: Option>>, -} - -impl TransformSortBuilder { - pub fn create( - input: Arc, - output: Arc, - schema: DataSchemaRef, - sort_desc: Arc<[SortColumnDescription]>, - block_size: usize, - spiller: Arc, - ) -> Self { - Self { - input, - output, - block_size, - schema, - sort_desc, - spiller, - order_col_generated: false, - output_order_col: false, - enable_loser_tree: false, - limit: None, - memory_settings: MemorySettings::disable_spill(), - processor: None, - } - } - - pub fn with_order_col_generated(mut self, order_col_generated: bool) -> Self { - self.order_col_generated = order_col_generated; - self - } - - pub fn with_output_order_col(mut self, output_order_col: bool) -> Self { - self.output_order_col = output_order_col; - self - } - - pub fn with_limit(mut self, limit: Option) -> Self { - self.limit = limit; - self - } - - pub fn with_memory_settings(mut self, memory_settings: MemorySettings) -> Self { - self.memory_settings = memory_settings; - self - } - - pub fn with_enable_loser_tree(mut self, enable_loser_tree: bool) -> Self { - self.enable_loser_tree = enable_loser_tree; - self - } - - pub fn build(mut self) -> Result> { - debug_assert!(if self.output_order_col { - self.schema.has_field(ORDER_COL_NAME) - } else { - !self.schema.has_field(ORDER_COL_NAME) - }); - - select_row_type(&mut self); - self.processor.unwrap() - } - - fn build_sort(&mut self) -> Result> - where - A: SortAlgorithm + 'static, - C: RowConverter + Send + 'static, - { - let schema = add_order_field(self.schema.clone(), &self.sort_desc); - Ok(Box::new(TransformSort::::new( - self.input.clone(), - self.output.clone(), - schema, - self.sort_desc.clone(), - self.block_size, - self.limit.map(|limit| (limit, false)), - self.spiller.clone(), - self.output_order_col, - self.order_col_generated, - self.memory_settings.clone(), - )?)) - } - - fn build_sort_limit(&mut self) -> Result> - where - A: SortAlgorithm + 'static, - C: RowConverter + Send + 'static, - { - let schema = add_order_field(self.schema.clone(), &self.sort_desc); - Ok(Box::new(TransformSort::::new( - self.input.clone(), - self.output.clone(), - schema, - self.sort_desc.clone(), - self.block_size, - Some((self.limit.unwrap(), true)), - self.spiller.clone(), - self.output_order_col, - self.order_col_generated, - self.memory_settings.clone(), - )?)) - } -} - -impl RowsTypeVisitor for TransformSortBuilder { - fn schema(&self) -> DataSchemaRef { - self.schema.clone() - } - - fn sort_desc(&self) -> &[SortColumnDescription] { - &self.sort_desc - } - - fn visit_type(&mut self) - where - R: Rows + 'static, - C: RowConverter + Send + 'static, - { - let processor = match ( - self.limit.map(|limit| limit < 10000).unwrap_or_default(), - self.enable_loser_tree, - ) { - (true, true) => self.build_sort_limit::, C>(), - (true, false) => self.build_sort_limit::, C>(), - (false, true) => self.build_sort::, C>(), - (false, false) => self.build_sort::, C>(), - }; - self.processor = Some(processor) - } -} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index f43462179ef0f..a4f314fc0b936 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -605,6 +605,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(4 * 1024..=u64::MAX)), }), + ("enable_range_shuffle_sort", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable range shuffle sort.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("group_by_shuffle_mode", DefaultSettingValue { value: UserSettingValue::String(String::from("before_merge")), desc: "Group by shuffle mode, 'before_partial' is more balanced, but more data needs to exchange.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index da82c54a7dd4e..c9482937c2838 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -486,6 +486,10 @@ impl Settings { Ok(self.try_get_u64("sort_spilling_memory_ratio")? as usize) } + pub fn get_enable_range_shuffle_sort(&self) -> Result { + Ok(self.try_get_u64("enable_range_shuffle_sort")? == 1) + } + pub fn get_group_by_shuffle_mode(&self) -> Result { self.try_get_string("group_by_shuffle_mode") } diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index 469a7088d4cd9..c0a5aa212603a 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -45,23 +45,25 @@ statement ok set sort_spilling_memory_ratio = 0; statement ok -set enable_parallel_multi_merge_sort = 0; +set enable_range_shuffle_sort = 0; query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - Transform Window × 1 - TransformWindowPartitionCollect(Sort) × 1 - ShuffleMergePartition(Window) × 1 - ShufflePartition(Window) × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataTransform × 1 - BlockPartitionSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + TransformWindowPartitionCollect(Sort) × 1 + ShuffleMergePartition(Window) × 1 + ShufflePartition(Window) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Enable sort spilling @@ -72,17 +74,19 @@ query T explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno; ---- CompoundBlockOperator(Project) × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - Transform Window × 1 - TransformWindowPartitionCollect(Sort) × 1 - ShuffleMergePartition(Window) × 1 - ShufflePartition(Window) × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataTransform × 1 - BlockPartitionSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + TransformWindowPartitionCollect(Sort) × 1 + ShuffleMergePartition(Window) × 1 + ShufflePartition(Window) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok @@ -447,17 +451,19 @@ avg(a) over (order by a rows between unbounded preceding and current row) from t CompoundBlockOperator(Project) × 1 LimitTransform × 1 Transform Window × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - Transform Window × 1 - TransformWindowPartitionCollect(Sort) × 1 - ShuffleMergePartition(Window) × 1 - ShufflePartition(Window) × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataTransform × 1 - BlockPartitionSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + TransformWindowPartitionCollect(Sort) × 1 + ShuffleMergePartition(Window) × 1 + ShufflePartition(Window) × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(pipeline explain) query T @@ -466,19 +472,21 @@ explain pipeline select *, sum(a) over (partition by a order by a desc rows betw CompoundBlockOperator(Project) × 1 TransformRowsFetcher × 1 LimitTransform × 1 - Merge to MultiSortMerge × 1 - TransformSortMergeLimit × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - Transform Window × 1 - TransformWindowPartitionCollect(Sort) × 1 - ShuffleMergePartition(Window) × 1 - ShufflePartition(Window) × 1 - TransformFilter × 1 - AddInternalColumnsTransform × 1 - DeserializeDataTransform × 1 - SyncReadParquetDataTransform × 1 - BlockPartitionSource × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMergeLimit × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + Transform Window × 1 + TransformWindowPartitionCollect(Sort) × 1 + ShuffleMergePartition(Window) × 1 + ShufflePartition(Window) × 1 + TransformFilter × 1 + AddInternalColumnsTransform × 1 + DeserializeDataTransform × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(plan explain) query T @@ -559,12 +567,14 @@ CompoundBlockOperator(Project) × 1 Transform Window × 1 CompoundBlockOperator(Map) × 1 Transform Window × 1 - Merge to MultiSortMerge × 1 - TransformSortMerge × 4 - SortPartialTransform × 4 - Merge to Resize × 4 - CompoundBlockOperator(Map) × 1 - NumbersSourceTransform × 1 + Merge to KWayMergeCombiner × 1 + KWayMergeWorker × 4 + KWayMergePartitioner × 1 + TransformSortMerge × 4 + SortPartialTransform × 4 + Merge to Resize × 4 + CompoundBlockOperator(Map) × 1 + NumbersSourceTransform × 1 # same order same partiton by multi window query T diff --git a/tests/sqllogictests/suites/stage/formats/parquet/read_policy.test b/tests/sqllogictests/suites/stage/formats/parquet/read_policy.test index df51ce3bb1325..075d2f24d28b9 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/read_policy.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/read_policy.test @@ -122,7 +122,7 @@ select id, t:a from @data/parquet/tuple.parquet where t:a > 1; # topk does not contain output query TT -select id, t:b from @data/parquet/tuple.parquet order by t:a desc limit 2; +select id, t:b from @data/parquet/tuple.parquet order by t:a desc, id desc limit 2; ---- 3 c 2 b @@ -135,7 +135,7 @@ select t, t:a from @data/parquet/tuple.parquet order by id desc limit 2; # topk contains output query TT -select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc limit 2; +select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc, id desc limit 2; ---- 3 c 3 2 b 3