Skip to content

feat(query): range shuffle sort for standalone mode #17853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
340a5e2
reservoir_sampling
forsaken628 Nov 8, 2024
40fa33b
Simpler
forsaken628 Nov 8, 2024
ac42d38
TransformSortSimple
forsaken628 Nov 12, 2024
809535c
rename
forsaken628 Apr 15, 2025
d6dd7b9
fix
forsaken628 Apr 18, 2025
7f5e43b
trait Spill
forsaken628 Apr 21, 2025
f192ca4
execute
forsaken628 Apr 21, 2025
4a2bed4
move
forsaken628 Apr 21, 2025
c4b1182
update
forsaken628 Apr 22, 2025
71fce4c
bounds
forsaken628 Apr 22, 2025
7c3ae2b
wait
forsaken628 Apr 22, 2025
7966294
remove
forsaken628 Apr 23, 2025
0fe057d
scalar
forsaken628 Apr 23, 2025
5312e49
exchange
forsaken628 Apr 23, 2025
751505d
update
forsaken628 Apr 23, 2025
3a1af37
test
forsaken628 Apr 23, 2025
b430371
x
forsaken628 Apr 24, 2025
83094fe
route
forsaken628 Apr 24, 2025
5cb259b
Merge remote-tracking branch 'up/main' into range-shuffle
forsaken628 Apr 24, 2025
b7b15af
builder
forsaken628 Apr 24, 2025
056c3d9
update
forsaken628 Apr 24, 2025
a5af6b7
build
forsaken628 Apr 24, 2025
5039da3
rename
forsaken628 Apr 24, 2025
61383d0
fix
forsaken628 Apr 25, 2025
a20533f
Merge remote-tracking branch 'up/main' into range-shuffle
forsaken628 Apr 25, 2025
28f5adf
fix
forsaken628 Apr 25, 2025
533262d
fix
forsaken628 Apr 25, 2025
88a11cb
fix
forsaken628 Apr 25, 2025
0ee6154
fix
forsaken628 Apr 25, 2025
3906b41
update
forsaken628 Apr 25, 2025
2c82187
fix
forsaken628 Apr 26, 2025
5e0ae93
Merge remote-tracking branch 'up/main' into range-shuffle
forsaken628 Apr 27, 2025
cef81b4
fix
forsaken628 Apr 27, 2025
44df0b0
Merge remote-tracking branch 'up/main' into range-shuffle
forsaken628 Apr 27, 2025
f288368
fix
forsaken628 Apr 27, 2025
5fa4b81
fix
forsaken628 Apr 27, 2025
8f49873
fix
forsaken628 Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/common/base/src/base/watch_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ impl WatchNotify {
let _ = rx.changed().await;
}

pub fn has_notified(&self) -> bool {
match self.rx.has_changed() {
Ok(b) => b,
Err(_) => {
// The sender has never dropped before
unreachable!()
}
}
}

pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}
Expand All @@ -61,11 +71,18 @@ mod tests {
#[tokio::test]
async fn test_notify_waiters_ahead() {
let notify = WatchNotify::new();
assert!(!notify.has_notified());
let notified1 = notify.notified();
assert!(!notify.has_notified());

// notify_waiters ahead of notified being instantiated and awaited
notify.notify_waiters();

assert!(notify.has_notified());
// this should not await indefinitely
let notified = notify.notified();
notified.await;
let notified2 = notify.notified();
notified2.await;

notified1.await;
assert!(notify.has_notified());
}
}
2 changes: 1 addition & 1 deletion src/query/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<T: ValueType> ValueType for ArrayType<T> {
scalar.clone()
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Array(array) => T::try_downcast_column(array),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl ValueType for BooleanType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Boolean(scalar) => Some(*scalar),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ValueType for DateType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Date(scalar) => Some(*scalar),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<Num: Decimal> ValueType for DecimalType<Num> {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
Num::try_downcast_scalar(scalar.as_decimal()?)
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/empty_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl ValueType for EmptyArrayType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::EmptyArray => Some(()),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/empty_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl ValueType for EmptyMapType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::EmptyMap => Some(()),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ValueType for IntervalType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Interval(scalar) => Some(*scalar),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl<K: ValueType, V: ValueType> ValueType for MapType<K, V> {
<MapInternal<K, V> as ValueType>::to_scalar_ref(scalar)
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Map(array) => KvPair::<K, V>::try_downcast_column(array),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ValueType for NullType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Null => Some(()),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<Num: Number> ValueType for NumberType<Num> {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
Num::try_downcast_scalar(scalar.as_number()?)
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ValueType for TimestampType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Timestamp(scalar) => Some(*scalar),
_ => None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_k_way_merge_sort::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_multi_sort_merge::*;
pub use transform_pipeline_helper::TransformPipelineHelper;
pub use transform_retry_async::*;
pub use transform_sort_merge::sort_merge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ impl Rows for BinaryColumn {
fn slice(&self, range: Range<usize>) -> Self {
self.slice(range)
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
match s {
Scalar::Binary(s) => s,
_ => unreachable!(),
}
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
Scalar::Binary(Vec::from(item))
}
}

impl RowConverter<BinaryColumn> for CommonRowConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod simple;
mod utils;

use std::fmt::Debug;
use std::ops::Range;

pub use common::*;
use databend_common_exception::ErrorCode;
Expand All @@ -25,7 +26,9 @@ use databend_common_expression::types::ArgType;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_expression::SortColumnDescription;
pub use simple::*;
pub use utils::*;
Expand All @@ -39,6 +42,18 @@ where Self: Sized + Debug
output_schema: DataSchemaRef,
) -> Result<Self>;
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;

fn convert_data_block(
&self,
sort_desc: &[SortColumnDescription],
data_block: &DataBlock,
) -> Result<T> {
let order_by_cols = sort_desc
.iter()
.map(|desc| data_block.get_by_offset(desc.offset).clone())
.collect::<Vec<_>>();
self.convert(&order_by_cols, data_block.num_rows())
}
}

/// Rows can be compared.
Expand Down Expand Up @@ -82,5 +97,9 @@ where Self: Sized + Clone + Debug + Send
self.row(self.len() - 1)
}

fn slice(&self, range: std::ops::Range<usize>) -> Self;
fn slice(&self, range: Range<usize>) -> Self;

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a>;

fn owned_item(item: Self::Item<'_>) -> Scalar;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_expression::SortColumnDescription;
use databend_common_expression::Value;

Expand Down Expand Up @@ -70,6 +71,15 @@ where
inner: T::slice_column(&self.inner, range),
}
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
let s = &s.as_ref();
T::try_downcast_scalar(s).unwrap()
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
T::upcast_scalar(T::to_owned_scalar(item))
}
}

/// Rows structure for single simple types. (numbers, date, timestamp)
Expand Down Expand Up @@ -113,6 +123,15 @@ where
inner: T::slice_column(&self.inner, range),
}
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
let s = &s.as_ref();
Reverse(T::try_downcast_scalar(s).unwrap())
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
T::upcast_scalar(T::to_owned_scalar(item.0))
}
}

/// If there is only one sort field and its type is a primitive type,
Expand Down
Loading
Loading