Skip to content

Commit 91c30f7

Browse files
committed
reset
1 parent 2e135ce commit 91c30f7

File tree

6 files changed

+13
-96
lines changed

6 files changed

+13
-96
lines changed

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl BlockThresholds {
153153
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154154
// Adjust the number of blocks based on block size thresholds.
155155
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
156-
let min_bytes_per_block = max_bytes_per_block / 2;
156+
let min_bytes_per_block = max_bytes_per_block / 3;
157157
let block_nums = if bytes_per_block > max_bytes_per_block {
158158
// Case 1: If the block size is too bigger.
159159
total_bytes.div_ceil(max_bytes_per_block)
@@ -190,29 +190,11 @@ impl BlockThresholds {
190190
return 1;
191191
}
192192

193-
// Estimate the number of blocks based on row count and compressed size.
194-
let by_rows = std::cmp::max(total_rows / self.max_rows_per_block, 1);
195-
let by_compressed = total_compressed / self.max_compressed_per_block;
196-
// If row-based block count is greater, use max rows per block as limit.
197-
if by_rows >= by_compressed {
198-
return by_rows;
199-
}
200-
201-
// Adjust block count based on byte size thresholds.
202-
let bytes_per_block = total_bytes.div_ceil(by_compressed);
203-
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
204-
let min_bytes = max_bytes / 2;
205-
let total_partitions = if bytes_per_block > max_bytes {
206-
// Block size is too large.
207-
total_bytes / max_bytes
208-
} else if bytes_per_block < min_bytes {
209-
// Block size is too small.
210-
total_bytes / min_bytes
211-
} else {
212-
// Block size is acceptable.
213-
by_compressed
214-
};
215-
216-
std::cmp::max(total_partitions, 1)
193+
let by_rows = std::cmp::max(total_rows.div_ceil(65536), 1);
194+
let by_bytes = std::cmp::max(
195+
total_compressed.div_ceil(self.min_compressed_per_block),
196+
total_bytes.div_ceil(self.min_bytes_per_block),
197+
);
198+
std::cmp::max(by_rows, by_bytes)
217199
}
218200
}

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ impl PipelineBuilder {
7979
window_spill_settings.clone(),
8080
disk_spill.clone(),
8181
partition.rows_per_block,
82-
partition.bytes_per_block,
8382
)?,
8483
)))
8584
})?;
@@ -93,7 +92,6 @@ impl PipelineBuilder {
9392
table,
9493
partition.table_meta_timestamps,
9594
false,
96-
Some(partition.bytes_per_block),
9795
)
9896
})
9997
} else {

src/query/service/src/pipelines/processors/transforms/window/partition/transform_hilbert_collect.rs

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,21 @@ use crate::spillers::SpillerType;
3737

3838
enum State {
3939
Collect,
40-
Flush,
4140
Spill,
4241
Restore,
43-
Concat(Vec<DataBlock>),
4442
}
4543

4644
pub struct TransformHilbertCollect {
4745
input: Arc<InputPort>,
4846
output: Arc<OutputPort>,
4947

50-
immediate_output_blocks: Vec<(usize, DataBlock)>,
5148
output_data_blocks: VecDeque<DataBlock>,
5249

5350
// The partition id is used to map the partition id to the new partition id.
5451
partition_id: Vec<usize>,
55-
partition_sizes: Vec<usize>,
5652
// The buffer is used to control the memory usage of the window operator.
5753
buffer: WindowPartitionBuffer,
5854

59-
max_block_size: usize,
6055
// Event variables.
6156
state: State,
6257
}
@@ -74,7 +69,6 @@ impl TransformHilbertCollect {
7469
memory_settings: MemorySettings,
7570
disk_spill: Option<SpillerDiskConfig>,
7671
max_block_rows: usize,
77-
max_block_size: usize,
7872
) -> Result<Self> {
7973
// Calculate the partition ids collected by the processor.
8074
let partitions: Vec<usize> = (0..num_partitions)
@@ -110,9 +104,6 @@ impl TransformHilbertCollect {
110104
output,
111105
partition_id,
112106
buffer,
113-
immediate_output_blocks: vec![],
114-
partition_sizes: vec![0; num_partitions],
115-
max_block_size,
116107
output_data_blocks: VecDeque::new(),
117108
state: State::Collect,
118109
})
@@ -130,11 +121,7 @@ impl Processor for TransformHilbertCollect {
130121
}
131122

132123
fn event(&mut self) -> Result<Event> {
133-
if matches!(self.state, State::Concat(_)) {
134-
return Ok(Event::Sync);
135-
}
136-
137-
if matches!(self.state, State::Flush | State::Spill | State::Restore) {
124+
if matches!(self.state, State::Spill | State::Restore) {
138125
return Ok(Event::Async);
139126
}
140127

@@ -157,11 +144,6 @@ impl Processor for TransformHilbertCollect {
157144
return Ok(Event::Async);
158145
}
159146

160-
if !self.immediate_output_blocks.is_empty() {
161-
self.state = State::Flush;
162-
return Ok(Event::Async);
163-
}
164-
165147
if self.input.is_finished() {
166148
if !self.buffer.is_empty() {
167149
self.state = State::Restore;
@@ -179,42 +161,18 @@ impl Processor for TransformHilbertCollect {
179161
self.state = State::Spill;
180162
return Ok(Event::Async);
181163
}
182-
183-
if !self.immediate_output_blocks.is_empty() {
184-
self.state = State::Flush;
185-
return Ok(Event::Async);
186-
}
187164
}
188165

189166
self.input.set_need_data();
190167
Ok(Event::NeedData)
191168
}
192169

193-
fn process(&mut self) -> Result<()> {
194-
match std::mem::replace(&mut self.state, State::Collect) {
195-
State::Concat(blocks) => {
196-
let output = DataBlock::concat(&blocks)?;
197-
self.output_data_blocks.push_back(output);
198-
}
199-
_ => unreachable!(),
200-
}
201-
Ok(())
202-
}
203-
204170
#[async_backtrace::framed]
205171
async fn async_process(&mut self) -> Result<()> {
206172
match std::mem::replace(&mut self.state, State::Collect) {
207173
State::Spill => {
208174
self.buffer.spill().await?;
209175
}
210-
State::Flush => {
211-
if let Some((partition_id, data_block)) = self.immediate_output_blocks.pop() {
212-
let mut restored_data_blocks =
213-
self.buffer.restore_by_id(partition_id, true).await?;
214-
restored_data_blocks.push(data_block);
215-
self.state = State::Concat(restored_data_blocks);
216-
}
217-
}
218176
State::Restore => {
219177
let restored_data_blocks = self.buffer.restore().await?;
220178
self.output_data_blocks.extend(restored_data_blocks);
@@ -234,12 +192,6 @@ impl TransformHilbertCollect {
234192
{
235193
for (partition_id, data_block) in meta.partitioned_data.into_iter() {
236194
let new_id = self.partition_id[partition_id];
237-
self.partition_sizes[new_id] += data_block.estimate_block_size();
238-
if self.partition_sizes[new_id] >= self.max_block_size {
239-
self.immediate_output_blocks.push((new_id, data_block));
240-
self.partition_sizes[new_id] = 0;
241-
continue;
242-
}
243195
self.buffer.add_data_block(new_id, data_block);
244196
}
245197
}

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,16 +233,11 @@ impl StreamBlockBuilder {
233233
}
234234

235235
pub fn need_flush(&self) -> bool {
236-
if let Some(max_block_bytes) = self.properties.max_block_bytes {
237-
if self.block_size >= max_block_bytes {
238-
return true;
239-
}
240-
};
241236
let file_size = self.block_writer.compressed_size();
242237
self.row_count >= self.properties.block_thresholds.min_rows_per_block
243238
|| self.block_size >= self.properties.block_thresholds.max_bytes_per_block
244239
|| (file_size >= self.properties.block_thresholds.min_compressed_per_block
245-
&& self.block_size >= self.properties.block_thresholds.min_bytes_per_block)
240+
&& self.block_size >= self.properties.block_thresholds.max_bytes_per_block / 4)
246241
}
247242

248243
pub fn write(&mut self, block: DataBlock) -> Result<()> {
@@ -355,7 +350,6 @@ pub struct StreamBlockProperties {
355350
pub(crate) ctx: Arc<dyn TableContext>,
356351
pub(crate) write_settings: WriteSettings,
357352
pub(crate) block_thresholds: BlockThresholds,
358-
pub(crate) max_block_bytes: Option<usize>,
359353

360354
meta_locations: TableMetaLocationGenerator,
361355
source_schema: TableSchemaRef,
@@ -374,7 +368,6 @@ impl StreamBlockProperties {
374368
ctx: Arc<dyn TableContext>,
375369
table: &FuseTable,
376370
table_meta_timestamps: TableMetaTimestamps,
377-
max_block_bytes: Option<usize>,
378371
) -> Result<Arc<Self>> {
379372
// remove virtual computed fields.
380373
let fields = table
@@ -434,7 +427,6 @@ impl StreamBlockProperties {
434427
ngram_args,
435428
inverted_index_builders,
436429
table_meta_timestamps,
437-
max_block_bytes,
438430
}))
439431
}
440432
}

src/query/storages/fuse/src/operations/append.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ impl FuseTable {
6161
self,
6262
table_meta_timestamps,
6363
false,
64-
None,
6564
)
6665
})?;
6766
} else {

src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,12 @@ impl TransformBlockWriter {
8585
table: &FuseTable,
8686
table_meta_timestamps: TableMetaTimestamps,
8787
with_tid: bool,
88-
max_block_bytes: Option<usize>,
8988
) -> Result<ProcessorPtr> {
9089
let max_block_rows = std::cmp::min(
9190
ctx.get_settings().get_max_block_size()? as usize,
9291
table.get_option(FUSE_OPT_KEY_ROW_PER_BLOCK, DEFAULT_BLOCK_ROW_COUNT),
9392
);
94-
let properties =
95-
StreamBlockProperties::try_create(ctx, table, table_meta_timestamps, max_block_bytes)?;
93+
let properties = StreamBlockProperties::try_create(ctx, table, table_meta_timestamps)?;
9694
Ok(ProcessorPtr::create(Box::new(TransformBlockWriter {
9795
state: State::Consume,
9896
input,
@@ -207,13 +205,9 @@ impl Processor for TransformBlockWriter {
207205
block.check_valid()?;
208206
self.input_data_size += block.estimate_block_size();
209207
self.input_num_rows += block.num_rows();
210-
if self.properties.max_block_bytes.is_some() {
211-
self.input_data.push_back(block);
212-
} else {
213-
let max_rows_per_block = self.calc_max_block_rows(&block);
214-
let blocks = block.split_by_rows_no_tail(max_rows_per_block);
215-
self.input_data.extend(blocks);
216-
}
208+
let max_rows_per_block = self.calc_max_block_rows(&block);
209+
let blocks = block.split_by_rows_no_tail(max_rows_per_block);
210+
self.input_data.extend(blocks);
217211
}
218212
State::Serialize => {
219213
while let Some(b) = self.input_data.pop_front() {

0 commit comments

Comments
 (0)