Skip to content

chore: add merge_into_resize_parallel_threads settings #15048

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,11 @@ impl PipelineBuilder {
} = merge_into_source;

self.build_pipeline(input)?;
self.main_pipeline
.try_resize(self.ctx.get_settings().get_max_threads()? as usize)?;
self.main_pipeline.try_resize(
self.ctx
.get_settings()
.get_merge_into_resize_parallel_threads()? as usize,
)?;
// 1. if matchedOnly, we will use inner join
// 2. if insert Only, we will use right anti join
// 3. other cases, we use right outer join
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("merge_into_resize_parallel_threads", DefaultSettingValue {
Copy link
Member

@BohuTANG BohuTANG Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this value should be adjust based on CPUs not a setting, we actually don't know what's the value to set.

Copy link
Member

@BohuTANG BohuTANG Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we have changed the adjust_io_request multiple times, returning to the simplest method in the end:
https://github.com/datafuselabs/databend/blob/4a55ea429a148a11ca365983976abc63363e4ae5/src/query/storages/fuse/src/operations/read_data.rs#L82-L92

Copy link
Contributor Author

@JackTan25 JackTan25 Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/datafuselabs/databend/pull/15048/files#diff-31afd04d842c6c44f5b86a534efe227d1bd19c04b8dbad24e4cfdae42f33300dR448 see here, the default is cpu threads, and we won't change the previous query time, it will change the merge into i/o pipeline. I need to modify the merge into to make it more tunable. It's to control the block write threads.In fact to reduce blocks writes time, not the i/o times at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the i/o reuqust is used to control the number of i/o at the same time, but it can't control the whole number of complete pipeline, we need to control the block compact and reduce it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Is this setting only for our testing not the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pr need to check, But in default, it won't make effect, I and @wubx find out the i/o is very high. So we need this. Let me do a cloud test with him.

value: UserSettingValue::UInt64(0),
desc: "tune the source parallel when too many small blocks",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("enable_distributed_merge_into", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables distributed execution for 'MERGE INTO'.",
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,13 @@ impl Settings {
Ok(self.try_get_u64("enable_distributed_copy_into")? != 0)
}

pub fn get_merge_into_resize_parallel_threads(&self) -> Result<u64> {
match self.try_get_u64("merge_into_resize_parallel_threads")? {
0 => Ok(self.get_max_threads()?),
value => Ok(std::cmp::min(self.get_max_threads()?, value)),
}
}

pub fn get_enable_experimental_merge_into(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_experimental_merge_into")? != 0)
}
Expand Down
Loading