Skip to content

Commit 254d5b9

Browse files
committed
refactor: rename components of the replace into impl (#17355)
refactor: rename components of `replace into` Initially, `REPLACE INTO` was designed as a simplified implementation of `MERGE INTO`, with the expectation that it would gradually evolve into a full-fledged implementation of `MERGE INTO`, as a result, many `MERGE INTO` related terms exist in the components of `REPLACE INTO` impl. However, `MERGE INTO` now has its own independent impl, the remaining `MERGE INTO` related terms in the impl of `REPLACE INTO` can be misleading. Therefore, in this PR, we have renamed all `MERGE INTO` related terms in the implof `REPLACE INTO` to avoid confusion.
1 parent 63cf08b commit 254d5b9

10 files changed

+77
-79
lines changed

src/query/storages/fuse/src/operations/replace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use rand::prelude::SliceRandom;
2929
use crate::io::BlockBuilder;
3030
use crate::io::ReadSettings;
3131
use crate::operations::mutation::SegmentIndex;
32-
use crate::operations::replace_into::MergeIntoOperationAggregator;
32+
use crate::operations::replace_into::ReplaceIntoOperationAggregator;
3333
use crate::FuseTable;
3434

3535
impl FuseTable {
@@ -102,7 +102,7 @@ impl FuseTable {
102102
let read_settings = ReadSettings::from_ctx(&ctx)?;
103103
let mut items = Vec::with_capacity(num_partition);
104104
for chunk_of_segment_locations in chunks {
105-
let item = MergeIntoOperationAggregator::try_create(
105+
let item = ReplaceIntoOperationAggregator::try_create(
106106
ctx.clone(),
107107
on_conflicts.clone(),
108108
bloom_filter_column_indexes.clone(),

src/query/storages/fuse/src/operations/replace_into/meta/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod merge_into_operation_meta;
15+
mod replace_into_operation_meta;
1616

17-
pub use merge_into_operation_meta::*;
17+
pub use replace_into_operation_meta::*;

src/query/storages/fuse/src/operations/replace_into/meta/merge_into_operation_meta.rs renamed to src/query/storages/fuse/src/operations/replace_into/meta/replace_into_operation_meta.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,8 @@ use databend_common_expression::BlockMetaInfoDowncast;
1919
use databend_common_expression::DataBlock;
2020
use databend_common_expression::Scalar;
2121

22-
// This mod need to be refactored, since it not longer aiming to be
23-
// used in the implementation of `MERGE INTO` statement in the future.
24-
//
25-
// unfortunately, distributed `replace-into` is being implemented in parallel,
26-
// to avoid the potential heavy merge conflicts, the refactoring is postponed.
27-
2822
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
29-
pub enum MergeIntoOperation {
23+
pub enum ReplaceIntoOperation {
3024
Delete(Vec<DeletionByColumn>),
3125
None,
3226
}
@@ -43,8 +37,8 @@ pub struct DeletionByColumn {
4337
pub bloom_hashes: Vec<RowBloomHashes>,
4438
}
4539

46-
#[typetag::serde(name = "merge_into_operation_meta")]
47-
impl BlockMetaInfo for MergeIntoOperation {
40+
#[typetag::serde(name = "replace_into_operation_meta")]
41+
impl BlockMetaInfo for ReplaceIntoOperation {
4842
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
4943
Self::downcast_ref_from(info).is_some_and(|other| self == other)
5044
}
@@ -54,16 +48,16 @@ impl BlockMetaInfo for MergeIntoOperation {
5448
}
5549
}
5650

57-
impl TryFrom<DataBlock> for MergeIntoOperation {
51+
impl TryFrom<DataBlock> for ReplaceIntoOperation {
5852
type Error = ErrorCode;
5953

6054
fn try_from(value: DataBlock) -> Result<Self, Self::Error> {
6155
let meta = value.get_owned_meta().ok_or_else(|| {
6256
ErrorCode::Internal(
63-
"convert MergeIntoOperation from data block failed, no block meta found",
57+
"convert ReplaceIntoOperation from data block failed, no block meta found",
6458
)
6559
})?;
66-
MergeIntoOperation::downcast_from(meta).ok_or_else(|| {
60+
ReplaceIntoOperation::downcast_from(meta).ok_or_else(|| {
6761
ErrorCode::Internal(
6862
"downcast block meta to MutationIntoOperation failed, type mismatch",
6963
)

src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
mod column_hash;
1616
mod deletion_accumulator;
17-
mod merge_into_mutator;
18-
mod mutator_replace_into;
17+
mod replace_into_mutator;
18+
mod replace_into_operation_agg;
1919

2020
pub use column_hash::row_hash_of_columns;
2121
pub use deletion_accumulator::BlockDeletionKeys;
2222
pub use deletion_accumulator::DeletionAccumulator;
23-
pub use merge_into_mutator::MergeIntoOperationAggregator;
24-
pub use mutator_replace_into::ReplaceIntoMutator;
23+
pub use replace_into_mutator::ReplaceIntoMutator;
24+
pub use replace_into_operation_agg::ReplaceIntoOperationAggregator;

src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs renamed to src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_mutator.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
4646
use log::info;
4747

4848
use crate::operations::replace_into::meta::DeletionByColumn;
49-
use crate::operations::replace_into::meta::MergeIntoOperation;
49+
use crate::operations::replace_into::meta::ReplaceIntoOperation;
5050
use crate::operations::replace_into::meta::UniqueKeyDigest;
5151
use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns;
5252
use crate::operations::replace_into::mutator::column_hash::RowScalarValue;
5353

5454
// Replace is somehow a simplified merge_into, which
5555
// - do insertion for "matched" branch
56-
// - update for "not-matched" branch (by sending MergeIntoOperation to downstream)
56+
// - update for "not-matched" branch (by sending ReplaceIntoOperation to downstream)
5757
pub struct ReplaceIntoMutator {
5858
on_conflict_fields: Vec<OnConflictField>,
5959
table_range_index: HashMap<ColumnId, ColumnStatistics>,
@@ -100,7 +100,7 @@ enum ColumnHash {
100100
}
101101

102102
impl ReplaceIntoMutator {
103-
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
103+
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<ReplaceIntoOperation> {
104104
// pruning rows by using table level range index
105105
// rows that definitely have no conflict will be removed
106106
metrics_inc_replace_original_row_number(data_block.num_rows() as u64);
@@ -111,10 +111,10 @@ impl ReplaceIntoMutator {
111111

112112
if row_number_after_pruning == 0 {
113113
info!("(replace-into) all rows are append-only");
114-
return Ok(MergeIntoOperation::None);
114+
return Ok(ReplaceIntoOperation::None);
115115
}
116116

117-
let merge_into_operation = if let Some(partitioner) = &self.partitioner {
117+
let replace_into_operation = if let Some(partitioner) = &self.partitioner {
118118
// if table has cluster keys; we partition the input data block by left most column of cluster keys
119119
let partitions = partitioner.partition(data_block)?;
120120
metrics_inc_replace_partition_number(partitions.len() as u64);
@@ -137,12 +137,12 @@ impl ReplaceIntoMutator {
137137
}
138138
})
139139
.collect();
140-
MergeIntoOperation::Delete(vs)
140+
ReplaceIntoOperation::Delete(vs)
141141
} else {
142142
// otherwise, we just build a single delete action
143-
self.build_merge_into_operation(&data_block_may_have_conflicts)?
143+
self.build_replace_into_operation(&data_block_may_have_conflicts)?
144144
};
145-
Ok(merge_into_operation)
145+
Ok(replace_into_operation)
146146
}
147147

148148
// filter out rows that definitely have no conflict, by using table level range index
@@ -171,7 +171,10 @@ impl ReplaceIntoMutator {
171171
data_block.clone().filter_with_bitmap(&bitmap)
172172
}
173173

174-
fn build_merge_into_operation(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
174+
fn build_replace_into_operation(
175+
&mut self,
176+
data_block: &DataBlock,
177+
) -> Result<ReplaceIntoOperation> {
175178
let num_rows = data_block.num_rows();
176179
let column_values = on_conflict_key_column_values(&self.on_conflict_fields, data_block);
177180

@@ -183,7 +186,7 @@ impl ReplaceIntoMutator {
183186
key_hashes,
184187
bloom_hashes: vec![],
185188
};
186-
Ok(MergeIntoOperation::Delete(vec![delete_action]))
189+
Ok(ReplaceIntoOperation::Delete(vec![delete_action]))
187190
}
188191
ColumnHash::Conflict(conflict_row_idx) => {
189192
let conflict_description = {

src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs renamed to src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ use crate::operations::mutation::BlockIndex;
7373
use crate::operations::mutation::SegmentIndex;
7474
use crate::operations::read_block;
7575
use crate::operations::replace_into::meta::DeletionByColumn;
76-
use crate::operations::replace_into::meta::MergeIntoOperation;
76+
use crate::operations::replace_into::meta::ReplaceIntoOperation;
7777
use crate::operations::replace_into::meta::UniqueKeyDigest;
7878
use crate::operations::replace_into::mutator::row_hash_of_columns;
7979
use crate::operations::replace_into::mutator::DeletionAccumulator;
@@ -104,12 +104,12 @@ struct AggregationContext {
104104
}
105105

106106
// Apply MergeIntoOperations to segments
107-
pub struct MergeIntoOperationAggregator {
107+
pub struct ReplaceIntoOperationAggregator {
108108
deletion_accumulator: DeletionAccumulator,
109109
aggregation_ctx: Arc<AggregationContext>,
110110
}
111111

112-
impl MergeIntoOperationAggregator {
112+
impl ReplaceIntoOperationAggregator {
113113
#[allow(clippy::too_many_arguments)] // TODO fix this
114114
pub fn try_create(
115115
ctx: Arc<dyn TableContext>,
@@ -216,15 +216,15 @@ impl MergeIntoOperationAggregator {
216216
}
217217

218218
// aggregate mutations (currently, deletion only)
219-
impl MergeIntoOperationAggregator {
219+
impl ReplaceIntoOperationAggregator {
220220
#[async_backtrace::framed]
221-
pub async fn accumulate(&mut self, merge_into_operation: MergeIntoOperation) -> Result<()> {
221+
pub async fn accumulate(&mut self, replace_into_operation: ReplaceIntoOperation) -> Result<()> {
222222
let aggregation_ctx = &self.aggregation_ctx;
223223
metrics_inc_replace_number_accumulated_merge_action();
224224

225225
let start = Instant::now();
226-
match merge_into_operation {
227-
MergeIntoOperation::Delete(partitions) => {
226+
match replace_into_operation {
227+
ReplaceIntoOperation::Delete(partitions) => {
228228
for (segment_index, (path, ver)) in &aggregation_ctx.segment_locations {
229229
// segment level
230230
let load_param = LoadParams {
@@ -280,7 +280,7 @@ impl MergeIntoOperationAggregator {
280280
}
281281
}
282282
}
283-
MergeIntoOperation::None => {}
283+
ReplaceIntoOperation::None => {}
284284
}
285285

286286
metrics_inc_replace_accumulated_merge_action_time_ms(start.elapsed().as_millis() as u64);
@@ -289,7 +289,7 @@ impl MergeIntoOperationAggregator {
289289
}
290290

291291
// apply the mutations and generate mutation log
292-
impl MergeIntoOperationAggregator {
292+
impl ReplaceIntoOperationAggregator {
293293
#[async_backtrace::framed]
294294
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
295295
metrics_inc_replace_number_apply_deletion();

src/query/storages/fuse/src/operations/replace_into/processors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
mod processor_broadcast;
1818
mod processor_replace_into;
1919
mod processor_unbranched_replace_into;
20-
mod transform_merge_into_mutation_aggregator;
20+
mod transform_replace_into_mutation_aggregator;
2121

2222
pub use processor_broadcast::BroadcastProcessor;
2323
pub use processor_replace_into::ReplaceIntoProcessor;

src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ pub struct ReplaceIntoProcessor {
4949

5050
// stage data blocks
5151
input_port: Arc<InputPort>,
52-
output_port_merge_into_action: Arc<OutputPort>,
52+
output_port_replace_into_action: Arc<OutputPort>,
5353
output_port_append_data: Arc<OutputPort>,
5454

5555
input_data: Option<DataBlock>,
56-
output_data_merge_into_action: Option<DataBlock>,
56+
output_data_replace_into_action: Option<DataBlock>,
5757
output_data_append: Option<DataBlock>,
5858

5959
target_table_empty: bool,
@@ -83,16 +83,16 @@ impl ReplaceIntoProcessor {
8383
table_range_idx,
8484
)?;
8585
let input_port = InputPort::create();
86-
let output_port_merge_into_action = OutputPort::create();
86+
let output_port_replace_into_action = OutputPort::create();
8787
let output_port_append_data = OutputPort::create();
8888

8989
Ok(Self {
9090
replace_into_mutator,
9191
input_port,
92-
output_port_merge_into_action,
92+
output_port_replace_into_action,
9393
output_port_append_data,
9494
input_data: None,
95-
output_data_merge_into_action: None,
95+
output_data_replace_into_action: None,
9696
output_data_append: None,
9797
target_table_empty,
9898
delete_when,
@@ -109,12 +109,12 @@ impl ReplaceIntoProcessor {
109109
#[allow(dead_code)]
110110
pub fn into_pipe_item(self) -> PipeItem {
111111
let input = self.input_port.clone();
112-
let output_port_merge_into_action = self.output_port_merge_into_action.clone();
112+
let output_port_replace_into_action = self.output_port_replace_into_action.clone();
113113
let output_port_append_data = self.output_port_append_data.clone();
114114
let processor_ptr = ProcessorPtr::create(Box::new(self));
115115
PipeItem::create(processor_ptr, vec![input], vec![
116116
output_port_append_data,
117-
output_port_merge_into_action,
117+
output_port_replace_into_action,
118118
])
119119
}
120120
}
@@ -131,10 +131,10 @@ impl Processor for ReplaceIntoProcessor {
131131
fn event(&mut self) -> Result<Event> {
132132
let finished = self.input_port.is_finished()
133133
&& self.output_data_append.is_none()
134-
&& self.output_data_merge_into_action.is_none();
134+
&& self.output_data_replace_into_action.is_none();
135135

136136
if finished {
137-
self.output_port_merge_into_action.finish();
137+
self.output_port_replace_into_action.finish();
138138
self.output_port_append_data.finish();
139139
return Ok(Event::Finished);
140140
}
@@ -147,9 +147,9 @@ impl Processor for ReplaceIntoProcessor {
147147
}
148148
}
149149

150-
if self.output_port_merge_into_action.can_push() {
151-
if let Some(data) = self.output_data_merge_into_action.take() {
152-
self.output_port_merge_into_action.push_data(Ok(data));
150+
if self.output_port_replace_into_action.can_push() {
151+
if let Some(data) = self.output_data_replace_into_action.take() {
152+
self.output_port_replace_into_action.push_data(Ok(data));
153153
pushed_something = true;
154154
}
155155
}
@@ -162,7 +162,8 @@ impl Processor for ReplaceIntoProcessor {
162162
}
163163

164164
if self.input_port.has_data() {
165-
if self.output_data_append.is_none() && self.output_data_merge_into_action.is_none()
165+
if self.output_data_append.is_none()
166+
&& self.output_data_replace_into_action.is_none()
166167
{
167168
// no pending data (being sent to down streams)
168169
self.input_data = Some(self.input_port.pull_data().unwrap()?);
@@ -207,12 +208,12 @@ impl Processor for ReplaceIntoProcessor {
207208
.collect::<HashSet<_>>();
208209
data_block = data_block.project(&projections);
209210
};
210-
let merge_into_action = self.replace_into_mutator.process_input_block(&data_block)?;
211+
let replace_into_action = self.replace_into_mutator.process_input_block(&data_block)?;
211212
metrics_inc_replace_process_input_block_time_ms(start.elapsed().as_millis() as u64);
212213
metrics_inc_replace_block_number_input(1);
213214
if !self.target_table_empty {
214-
self.output_data_merge_into_action =
215-
Some(DataBlock::empty_with_meta(Box::new(merge_into_action)));
215+
self.output_data_replace_into_action =
216+
Some(DataBlock::empty_with_meta(Box::new(replace_into_action)));
216217
}
217218

218219
if all_delete {

0 commit comments

Comments
 (0)