Skip to content

Commit 3ada53b

Browse files
committed
add compact strategy
1 parent 2e135ce commit 3ada53b

File tree

7 files changed

+114
-79
lines changed

7 files changed

+114
-79
lines changed

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod aggregator;
1717
mod broadcast;
1818
mod hash_join;
1919
pub(crate) mod range_join;
20+
mod recluster;
2021
mod runtime_pool;
2122
mod transform_add_computed_columns;
2223
mod transform_add_const_columns;
@@ -45,6 +46,7 @@ mod window;
4546
pub use broadcast::BroadcastSinkProcessor;
4647
pub use broadcast::BroadcastSourceProcessor;
4748
pub use hash_join::*;
49+
pub use recluster::*;
4850
pub use transform_add_computed_columns::TransformAddComputedColumns;
4951
pub use transform_add_const_columns::TransformAddConstColumns;
5052
pub use transform_add_internal_columns::TransformAddInternalColumns;
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::DataBlock;
17+
18+
use crate::pipelines::processors::transforms::DataProcessorStrategy;
19+
20+
pub struct CompactStrategy {
21+
max_bytes_per_block: usize,
22+
max_rows_per_block: usize,
23+
}
24+
25+
impl CompactStrategy {
26+
pub fn new(max_rows_per_block: usize, max_bytes_per_block: usize) -> Self {
27+
Self {
28+
max_bytes_per_block,
29+
max_rows_per_block,
30+
}
31+
}
32+
33+
fn concat_blocks(blocks: Vec<DataBlock>) -> Result<DataBlock> {
34+
DataBlock::concat(&blocks)
35+
}
36+
37+
fn check_large_enough(&self, rows: usize, bytes: usize) -> bool {
38+
rows >= self.max_rows_per_block || bytes >= self.max_bytes_per_block
39+
}
40+
}
41+
42+
impl DataProcessorStrategy for CompactStrategy {
43+
const NAME: &'static str = "Compact";
44+
45+
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
46+
let blocks_num = data_blocks.len();
47+
if blocks_num < 2 {
48+
return Ok(data_blocks);
49+
}
50+
51+
let mut accumulated_rows = 0;
52+
let mut accumulated_bytes = 0;
53+
let mut pending_blocks = Vec::with_capacity(blocks_num);
54+
let mut staged_blocks = Vec::with_capacity(blocks_num);
55+
let mut result = Vec::with_capacity(blocks_num);
56+
for block in data_blocks {
57+
accumulated_rows += block.num_rows();
58+
accumulated_bytes += block.estimate_block_size();
59+
pending_blocks.push(block);
60+
if !self.check_large_enough(accumulated_rows, accumulated_bytes) {
61+
continue;
62+
}
63+
if !staged_blocks.is_empty() {
64+
result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
65+
}
66+
std::mem::swap(&mut staged_blocks, &mut pending_blocks);
67+
accumulated_rows = 0;
68+
accumulated_bytes = 0;
69+
}
70+
71+
staged_blocks.append(&mut pending_blocks);
72+
if !staged_blocks.is_empty() {
73+
result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
74+
}
75+
76+
Ok(result)
77+
}
78+
}
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
// Some variables and functions are named and designed with reference to ClickHouse.
16-
// - https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Transforms/WindowTransform.h
17-
// - https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/Transforms/WindowTransform.cpp
18-
1915
use std::sync::Arc;
2016

2117
use databend_common_exception::Result;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod compact_strategy;
16+
mod hilbert_partition_exchange;
17+
mod transform_hilbert_collect;
18+
19+
pub use compact_strategy::CompactStrategy;
20+
pub use hilbert_partition_exchange::HilbertPartitionExchange;
21+
pub use transform_hilbert_collect::TransformHilbertCollect;
Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ use databend_common_pipeline_transforms::MemorySettings;
2727
use databend_common_settings::Settings;
2828
use databend_common_storage::DataOperator;
2929

30-
use super::WindowPartitionBuffer;
31-
use super::WindowPartitionMeta;
30+
use crate::pipelines::processors::transforms::CompactStrategy;
31+
use crate::pipelines::processors::transforms::DataProcessorStrategy;
32+
use crate::pipelines::processors::transforms::WindowPartitionBuffer;
33+
use crate::pipelines::processors::transforms::WindowPartitionMeta;
3234
use crate::sessions::QueryContext;
3335
use crate::spillers::Spiller;
3436
use crate::spillers::SpillerConfig;
@@ -40,7 +42,7 @@ enum State {
4042
Flush,
4143
Spill,
4244
Restore,
43-
Concat(Vec<DataBlock>),
45+
Compact(Vec<DataBlock>),
4446
}
4547

4648
pub struct TransformHilbertCollect {
@@ -56,6 +58,7 @@ pub struct TransformHilbertCollect {
5658
// The buffer is used to control the memory usage of the window operator.
5759
buffer: WindowPartitionBuffer,
5860

61+
compact_strategy: CompactStrategy,
5962
max_block_size: usize,
6063
// Event variables.
6164
state: State,
@@ -100,8 +103,6 @@ impl TransformHilbertCollect {
100103
let spiller = Spiller::create(ctx, operator, spill_config)?;
101104

102105
// Create the window partition buffer.
103-
let max_block_rows =
104-
max_block_rows.min(settings.get_window_partition_sort_block_size()? as usize);
105106
let buffer =
106107
WindowPartitionBuffer::new(spiller, partitions.len(), max_block_rows, memory_settings)?;
107108

@@ -113,6 +114,7 @@ impl TransformHilbertCollect {
113114
immediate_output_blocks: vec![],
114115
partition_sizes: vec![0; num_partitions],
115116
max_block_size,
117+
compact_strategy: CompactStrategy::new(max_block_rows, max_block_size),
116118
output_data_blocks: VecDeque::new(),
117119
state: State::Collect,
118120
})
@@ -130,7 +132,7 @@ impl Processor for TransformHilbertCollect {
130132
}
131133

132134
fn event(&mut self) -> Result<Event> {
133-
if matches!(self.state, State::Concat(_)) {
135+
if matches!(self.state, State::Compact(_)) {
134136
return Ok(Event::Sync);
135137
}
136138

@@ -192,9 +194,9 @@ impl Processor for TransformHilbertCollect {
192194

193195
fn process(&mut self) -> Result<()> {
194196
match std::mem::replace(&mut self.state, State::Collect) {
195-
State::Concat(blocks) => {
196-
let output = DataBlock::concat(&blocks)?;
197-
self.output_data_blocks.push_back(output);
197+
State::Compact(blocks) => {
198+
let output = self.compact_strategy.process_data_blocks(blocks)?;
199+
self.output_data_blocks.extend(output);
198200
}
199201
_ => unreachable!(),
200202
}
@@ -212,12 +214,12 @@ impl Processor for TransformHilbertCollect {
212214
let mut restored_data_blocks =
213215
self.buffer.restore_by_id(partition_id, true).await?;
214216
restored_data_blocks.push(data_block);
215-
self.state = State::Concat(restored_data_blocks);
217+
self.state = State::Compact(restored_data_blocks);
216218
}
217219
}
218220
State::Restore => {
219221
let restored_data_blocks = self.buffer.restore().await?;
220-
self.output_data_blocks.extend(restored_data_blocks);
222+
self.state = State::Compact(restored_data_blocks);
221223
}
222224
_ => unreachable!(),
223225
}

src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -24,66 +24,6 @@ pub trait DataProcessorStrategy: Send + Sync + 'static {
2424
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>>;
2525
}
2626

27-
pub struct CompactStrategy {
28-
max_bytes_per_block: usize,
29-
max_rows_per_block: usize,
30-
}
31-
32-
impl CompactStrategy {
33-
pub fn new(max_rows_per_block: usize, max_bytes_per_block: usize) -> Self {
34-
Self {
35-
max_bytes_per_block,
36-
max_rows_per_block,
37-
}
38-
}
39-
40-
fn concat_blocks(blocks: Vec<DataBlock>) -> Result<DataBlock> {
41-
DataBlock::concat(&blocks)
42-
}
43-
44-
fn check_large_enough(&self, rows: usize, bytes: usize) -> bool {
45-
rows >= self.max_rows_per_block || bytes >= self.max_bytes_per_block
46-
}
47-
}
48-
49-
impl DataProcessorStrategy for CompactStrategy {
50-
const NAME: &'static str = "Compact";
51-
52-
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
53-
let blocks_num = data_blocks.len();
54-
if blocks_num < 2 {
55-
return Ok(data_blocks);
56-
}
57-
58-
let mut accumulated_rows = 0;
59-
let mut accumulated_bytes = 0;
60-
let mut pending_blocks = Vec::with_capacity(blocks_num);
61-
let mut staged_blocks = Vec::with_capacity(blocks_num);
62-
let mut result = Vec::with_capacity(blocks_num);
63-
for block in data_blocks {
64-
accumulated_rows += block.num_rows();
65-
accumulated_bytes += block.estimate_block_size();
66-
pending_blocks.push(block);
67-
if !self.check_large_enough(accumulated_rows, accumulated_bytes) {
68-
continue;
69-
}
70-
if !staged_blocks.is_empty() {
71-
result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
72-
}
73-
std::mem::swap(&mut staged_blocks, &mut pending_blocks);
74-
accumulated_rows = 0;
75-
accumulated_bytes = 0;
76-
}
77-
78-
staged_blocks.append(&mut pending_blocks);
79-
if !staged_blocks.is_empty() {
80-
result.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
81-
}
82-
83-
Ok(result)
84-
}
85-
}
86-
8727
pub struct SortStrategy {
8828
sort_desc: Vec<SortColumnDescription>,
8929
schema: DataSchemaRef,

src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,13 @@
1313
// limitations under the License.
1414

1515
mod data_processor_strategy;
16-
mod hilbert_partition_exchange;
17-
mod transform_hilbert_collect;
1816
mod transform_window_partition_collect;
1917
mod window_partition_buffer;
2018
mod window_partition_exchange;
2119
mod window_partition_meta;
2220
mod window_partition_partial_top_n_exchange;
2321

2422
pub use data_processor_strategy::*;
25-
pub use hilbert_partition_exchange::*;
26-
pub use transform_hilbert_collect::*;
2723
pub use transform_window_partition_collect::*;
2824
pub use window_partition_buffer::*;
2925
pub use window_partition_exchange::*;

0 commit comments

Comments
 (0)