diff --git a/Cargo.lock b/Cargo.lock index 42859ffe85455..a99d070bfbd71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2939,6 +2939,8 @@ dependencies = [ "bytemuck", "bytes", "bytesize", + "chrono", + "concurrent-queue", "crc32fast", "ctrlc", "databend-common-exception", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 26547eb91fee1..303ee144f82b5 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -25,6 +25,8 @@ borsh = { workspace = true } bytemuck = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } +chrono = { workspace = true } +concurrent-queue = { workspace = true } crc32fast = { workspace = true } ctrlc = { workspace = true } enquote = { workspace = true } diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index d809ac2792525..3776957ab6ecd 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -24,6 +24,7 @@ pub mod profile; mod runtime; mod runtime_tracker; mod thread; +mod time_series; pub use backtrace::dump_backtrace; pub use backtrace::get_all_tasks; @@ -60,3 +61,11 @@ pub use runtime_tracker::TrackingPayload; pub use runtime_tracker::UnlimitedFuture; pub use thread::Thread; pub use thread::ThreadJoinHandle; +pub use time_series::compress_time_point; +pub use time_series::get_time_series_profile_desc; +pub use time_series::ProfilePoints; +pub use time_series::QueryTimeSeriesProfile; +pub use time_series::QueryTimeSeriesProfileBuilder; +pub use time_series::TimeSeriesProfileDesc; +pub use time_series::TimeSeriesProfileName; +pub use time_series::TimeSeriesProfiles; diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index f71fe9ca2cfbd..a1552bfed6457 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -55,7 +55,9 @@ use crate::runtime::memory::GlobalStatBuffer; use crate::runtime::memory::MemStat; use crate::runtime::metrics::ScopedRegistry; use crate::runtime::profile::Profile; +use crate::runtime::time_series::QueryTimeSeriesProfile; use crate::runtime::MemStatBuffer; +use crate::runtime::TimeSeriesProfiles; // For implemented and needs to call drop, we cannot use the attribute tag thread local. // https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=ea33533387d401e86423df1a764b5609 @@ -104,6 +106,8 @@ pub struct TrackingPayload { pub mem_stat: Option>, pub metrics: Option>, pub should_log: bool, + pub time_series_profile: Option>, + pub local_time_series_profile: Option>, } pub struct TrackingGuard { @@ -165,6 +169,8 @@ impl ThreadTracker { mem_stat: None, query_id: None, should_log: true, + time_series_profile: None, + local_time_series_profile: None, }, } } diff --git a/src/common/base/src/runtime/time_series/mod.rs b/src/common/base/src/runtime/time_series/mod.rs new file mode 100644 index 0000000000000..8fe3e1455557b --- /dev/null +++ b/src/common/base/src/runtime/time_series/mod.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod profile; +mod query_profile; + +pub use profile::compress_time_point; +pub use profile::get_time_series_profile_desc; +pub use profile::ProfilePoints; +pub use profile::TimeSeriesProfileDesc; +pub use profile::TimeSeriesProfileName; +pub use profile::TimeSeriesProfiles; +pub use query_profile::QueryTimeSeriesProfile; +pub use query_profile::QueryTimeSeriesProfileBuilder; diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs new file mode 100644 index 0000000000000..d2598afdc083a --- /dev/null +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -0,0 +1,224 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +use concurrent_queue::ConcurrentQueue; +use once_cell::sync::OnceCell; +use serde::Serialize; + +// 1 second in milliseconds +const DEFAULT_INTERVAL: usize = 1000; + +// DataPoint is a tuple of (timestamp, value) +type DataPoint = (usize, usize); + +pub enum TimeSeriesProfileName { + OutputRows, + OutputBytes, +} + +#[derive(Serialize)] +pub struct TimeSeriesProfileDesc { + name: &'static str, + index: u32, +} +pub static TIME_SERIES_PROFILES_DESC: OnceCell>> = OnceCell::new(); + +pub fn get_time_series_profile_desc() -> Arc> { + TIME_SERIES_PROFILES_DESC + .get_or_init(|| { + Arc::new(vec![ + TimeSeriesProfileDesc { + name: "OutputRows", + index: TimeSeriesProfileName::OutputRows as u32, + }, + TimeSeriesProfileDesc { + name: "OutputBytes", + index: TimeSeriesProfileName::OutputBytes as u32, + }, + ]) + }) + .clone() +} + +pub struct ProfilePoints { + pub points: ConcurrentQueue, + pub value: AtomicUsize, + pub last_check_timestamp: AtomicUsize, +} + +pub struct TimeSeriesProfiles { + pub profiles: Vec, +} + +impl ProfilePoints { + pub fn new() -> Self { + ProfilePoints { + points: ConcurrentQueue::unbounded(), + last_check_timestamp: AtomicUsize::new(0), + value: AtomicUsize::new(0), + } + } + pub fn record_time_slot(&self, now: usize, value: usize) -> bool { + let mut is_record = false; + let mut current_last_check = 0; + loop { + match self.last_check_timestamp.compare_exchange_weak( + current_last_check, + now, + SeqCst, + SeqCst, + ) { + Ok(_) => { + if current_last_check == 0 { + // the first time, we will record it in next time slot + break; + } + if now == current_last_check { + // still in the same slot + break; + } + let last_value = self.value.swap(0, SeqCst); + let _ = self.points.push((current_last_check, last_value)); + is_record = true; + break; + } + Err(last_record) => { + if now < last_record { + // for concurrent situation, `now` could be earlier than `last_record` + // that means we are missing the time slot, it is already push into + // the points queue. We just need to push the value into the queue again. + // will merge them in the flush + let _ = self.points.push((now, value)); + // early return, should avoid adding value into this time slot + return true; + } + current_last_check = last_record; + } + } + } + self.value.fetch_add(value, SeqCst); + is_record + } +} + +impl Default for ProfilePoints { + fn default() -> Self { + Self::new() + } +} + +impl TimeSeriesProfiles { + pub fn new() -> Self { + let type_num = mem::variant_count::(); + TimeSeriesProfiles { + profiles: Self::create_profiles(type_num), + } + } + + fn create_profiles(type_num: usize) -> Vec { + let mut profiles = Vec::with_capacity(type_num); + for _ in 0..type_num { + profiles.push(ProfilePoints::new()); + } + profiles + } + + pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool { + let profile = &self.profiles[name as usize]; + let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL; + profile.record_time_slot(now, value) + } + + pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec>> { + let mut batch = Vec::with_capacity(self.profiles.len()); + for profile in self.profiles.iter() { + if *quota == 0 && !finish { + break; + } + if finish { + // if flush called by finish, we need to flush the last record + let last_timestamp = profile.last_check_timestamp.load(SeqCst); + let last_value = profile.value.swap(0, SeqCst); + if last_value != 0 && last_timestamp != 0 { + let _ = profile.points.push((last_timestamp, last_value)); + } + } + let mut points = Vec::with_capacity(profile.points.len()); + while let Ok(point) = profile.points.pop() { + points.push(point); + *quota -= 1; + if *quota == 0 && !finish { + break; + } + } + batch.push(compress_time_point(&points)); + } + batch + } +} + +impl Default for TimeSeriesProfiles { + fn default() -> Self { + Self::new() + } +} + +/// Compresses a sequence of (`Vec`, i.e., a list of (timestamp, value)) +/// into a more compact format: `Vec>`. +/// +/// Compressed format description: +/// - Each `Vec` represents a segment of consecutive timestamps. +/// - The first element of each `Vec` is the starting timestamp (start_time) of the segment. +/// - The following elements are the values corresponding to each consecutive timestamp in that segment. +/// +/// Example: +/// given the original data: +/// `[(1744971865,100), (1744971866,200), (1744971867,50), (1744971868,150), (1744971870,20), (1744971871,40)]` +/// the compressed result will be: +/// `[[1744971865, 100, 200, 50, 150], [1744971870, 20, 40]]` +/// +/// Note: +/// Why convert to `[timestamp, value0, value1, value2]` instead of `[timestamp, (value0, value1, value2)]`: +/// Rust serde_json will convert a tuple to a list. [timestamp, (value0, value1, value2)] will be converted to +/// `[timestamp, value0, value1, value2]` after serialization. +/// See: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=3c153dfcfdde3032c80c05f4010f3d0f +pub fn compress_time_point(points: &[DataPoint]) -> Vec> { + let mut result = Vec::new(); + let mut i = 0; + while i < points.len() { + let (start_time, value) = points[i]; + let mut group = Vec::new(); + group.push(start_time); + group.push(value); + let mut j = i + 1; + while j < points.len() + && (points[j].0 == points[j - 1].0 + 1 || points[j].0 == points[j - 1].0) + { + let mut v = points[j].1; + if points[j].0 == points[j - 1].0 { + v += group.pop().unwrap(); + } + group.push(v); + j += 1; + } + result.push(group); + i = j; + } + result +} diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs new file mode 100644 index 0000000000000..a47a34e13e2d2 --- /dev/null +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -0,0 +1,157 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +use log::info; +use serde::Serialize; + +use crate::runtime::get_time_series_profile_desc; +use crate::runtime::time_series::profile::TimeSeriesProfileName; +use crate::runtime::time_series::profile::TimeSeriesProfiles; +use crate::runtime::ThreadTracker; +use crate::runtime::TimeSeriesProfileDesc; + +const DEFAULT_BATCH_SIZE: usize = 1024; + +pub struct QueryTimeSeriesProfile { + pub global_count: AtomicUsize, + pub plans_profiles: Vec<(u32, Arc)>, + pub query_id: String, +} + +impl QueryTimeSeriesProfile { + pub fn record_time_series_profile(name: TimeSeriesProfileName, value: usize) { + ThreadTracker::with( + |x| match x.borrow().payload.local_time_series_profile.as_ref() { + None => {} + Some(profile) => { + if profile.record(name, value) { + if let Some(global_profile) = + x.borrow().payload.time_series_profile.as_ref() + { + let should_flush = Self::should_flush(&global_profile.global_count); + if should_flush { + global_profile.flush(false); + } + } + } + } + }, + ) + } + + pub fn should_flush(global_count: &AtomicUsize) -> bool { + let mut prev = 0; + loop { + let next = if prev == DEFAULT_BATCH_SIZE - 1 { + 0 + } else { + prev + 1 + }; + match global_count.compare_exchange_weak(prev, next, SeqCst, SeqCst) { + Ok(_) => { + if next == 0 { + return true; + } + return false; + } + Err(next_prev) => { + prev = next_prev; + } + } + } + } + + pub fn flush(&self, finish: bool) { + #[derive(Serialize)] + struct QueryTimeSeries { + query_id: String, + plans: Vec, + desc: Arc>, + } + #[derive(Serialize)] + struct PlanTimeSeries { + plan_id: u32, + data: Vec>>, + } + let mut quota = DEFAULT_BATCH_SIZE as i32; + let mut plans = Vec::with_capacity(self.plans_profiles.len()); + for (plan_id, plan_profile) in self.plans_profiles.iter() { + if quota == 0 && !finish { + break; + } + let profile_time_series_vec = plan_profile.flush(finish, &mut quota); + plans.push(PlanTimeSeries { + plan_id: *plan_id, + data: profile_time_series_vec, + }); + } + if quota == DEFAULT_BATCH_SIZE as i32 { + return; + } + let query_time_series = QueryTimeSeries { + query_id: self.query_id.clone(), + plans, + desc: get_time_series_profile_desc(), + }; + let json = serde_json::to_string(&query_time_series).unwrap(); + info!(target: "databend::log::time_series", "{}", json); + } +} + +impl Drop for QueryTimeSeriesProfile { + fn drop(&mut self) { + self.flush(true); + } +} + +pub struct QueryTimeSeriesProfileBuilder { + plans_profile: BTreeMap>, + query_id: String, +} + +impl QueryTimeSeriesProfileBuilder { + pub fn new(query_id: String) -> Self { + QueryTimeSeriesProfileBuilder { + plans_profile: BTreeMap::new(), + query_id, + } + } + + pub fn register_time_series_profile(&mut self, plan_id: u32) -> Arc { + if !self.plans_profile.contains_key(&plan_id) { + let profile = Arc::new(TimeSeriesProfiles::new()); + self.plans_profile.insert(plan_id, profile.clone()); + profile + } else { + self.plans_profile.get(&plan_id).unwrap().clone() + } + } + + pub fn build(&self) -> QueryTimeSeriesProfile { + QueryTimeSeriesProfile { + global_count: AtomicUsize::new(0), + plans_profiles: self + .plans_profile + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect(), + query_id: self.query_id.clone(), + } + } +} diff --git a/src/common/base/tests/it/main.rs b/src/common/base/tests/it/main.rs index d40345946826f..9ec23a32818be 100644 --- a/src/common/base/tests/it/main.rs +++ b/src/common/base/tests/it/main.rs @@ -24,6 +24,7 @@ mod range_merger; mod runtime; mod stoppable; mod string; +mod time_series; // runtime tests depends on the memory stat collector. #[global_allocator] diff --git a/src/common/base/tests/it/time_series/mod.rs b/src/common/base/tests/it/time_series/mod.rs new file mode 100644 index 0000000000000..df8b6ea489d98 --- /dev/null +++ b/src/common/base/tests/it/time_series/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod profile; diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs new file mode 100644 index 0000000000000..e2cdaf64e7806 --- /dev/null +++ b/src/common/base/tests/it/time_series/profile.rs @@ -0,0 +1,239 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::thread; +use std::time::Duration; + +use databend_common_base::runtime::compress_time_point; +use databend_common_base::runtime::ProfilePoints; +use databend_common_base::runtime::QueryTimeSeriesProfile; +use databend_common_base::runtime::TimeSeriesProfileName; +use databend_common_base::runtime::TimeSeriesProfiles; +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use databend_common_exception::Result; + +#[test] +fn test_time_series_profile_record() -> Result<()> { + let profile = TimeSeriesProfiles::new(); + + // 1. For the first record, we need only increase the inner counter + profile.record(TimeSeriesProfileName::OutputBytes, 1000); + let value = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .value + .load(SeqCst); + let len = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .len(); + assert_eq!(value, 1000); + assert_eq!(len, 0); + + thread::sleep(Duration::from_secs(1)); + + // 2. for next time slot, we need to add a new point + // after that, we need to reset the counter + profile.record(TimeSeriesProfileName::OutputBytes, 1234); + let value = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .value + .load(SeqCst); + let len = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .len(); + assert_eq!(value, 1234); + assert_eq!(len, 1); + // the first point should be 1000 + let first = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .pop() + .unwrap(); + assert_ne!(first.0, 0); + assert_eq!(first.1, 1000); + + Ok(()) +} +#[test] +fn test_compress_time_point_basic() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + (1744971867, 50), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![vec![1744971865, 100, 200, 50, 150], vec![ + 1744971870, 20, 40, + ]]; + assert_eq!(compress_time_point(&input), expected); +} + +#[test] +fn test_compress_time_point_special_duplicate_merge() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + // same timestamp, we will merge + (1744971867, 50), + (1744971867, 123), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![vec![1744971865, 100, 200, 50 + 123, 150], vec![ + 1744971870, 20, 40, + ]]; + assert_eq!(compress_time_point(&input), expected); +} + +#[test] +fn test_compress_time_point_special_invalid() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + (1744971867, 50), + // extreme case, we not merge it + (1744971865, 123), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![ + vec![1744971865, 100, 200, 50], + vec![1744971865, 123], + vec![1744971868, 150], + vec![1744971870, 20, 40], + ]; + assert_eq!(compress_time_point(&input), expected); +} + +#[test] +fn test_compress_time_point_no_consecutive() { + let input = vec![(1744971865, 10), (1744971867, 20), (1744971869, 30)]; + let expected = vec![vec![1744971865, 10], vec![1744971867, 20], vec![ + 1744971869, 30, + ]]; + assert_eq!(compress_time_point(&input), expected); +} + +#[test] +fn test_finish_flush() { + let profile = TimeSeriesProfiles::new(); + + profile.record(TimeSeriesProfileName::OutputBytes, 2000); + profile.record(TimeSeriesProfileName::OutputRows, 1000); + + thread::sleep(Duration::from_secs(1)); + + // Finish flush will read this from `profile.value` and append it to the points + profile.record(TimeSeriesProfileName::OutputBytes, 2); + profile.record(TimeSeriesProfileName::OutputRows, 1); + + let batch = profile.flush(true, &mut 4); + + assert_eq!(batch.len(), 2); + // [[timestamp, 1000, 1]] + assert_eq!(batch[0][0].len(), 3); + // [[timestamp, 2000, 2]] + assert_eq!(batch[1][0].len(), 3); +} + +#[test] +fn test_record_inner_basic() -> Result<()> { + let points = ProfilePoints::new(); + let now = chrono::Utc::now().timestamp() as usize; + + // Simulate recording in the same time slot + for i in 0..10 { + points.record_time_slot(now, i); + } + assert_eq!(points.points.len(), 0); + assert_eq!(points.value.load(SeqCst), (0..10).sum::()); + + // Next time slot + for i in 0..100 { + points.record_time_slot(now + 1, i); + } + assert_eq!(points.points.len(), 1); + let x = points.points.pop().unwrap(); + assert_eq!(x.0, now); + assert_eq!(x.1, (0..10).sum::()); + assert_eq!(points.value.load(SeqCst), (0..100).sum::()); + Ok(()) +} + +#[test] +fn test_record_inner_special() -> Result<()> { + // Simulate concurrently recording but one thread is late + let points = ProfilePoints::new(); + let now = 1000000001_usize; + for i in 0..10 { + points.record_time_slot(now, i); + } + points.record_time_slot(now + 1, 123); + points.record_time_slot(now, 456); + points.record_time_slot(now + 2, 789); + + let v = points.points.try_iter().collect::>(); + + assert_eq!( + format!("{:?}", v), + "[(1000000001, 45), (1000000001, 456), (1000000002, 123)]" + ); + + assert_eq!(points.value.load(SeqCst), 789); + Ok(()) +} + +#[test] +fn test_record_inner_special_invalid() -> Result<()> { + // Simulate concurrently recording but one thread is later than 1 second + let points = ProfilePoints::new(); + let now = 1000000001_usize; + for i in 0..10 { + points.record_time_slot(now, i); + } + points.record_time_slot(now + 1, 123); + points.record_time_slot(now - 2, 456); + points.record_time_slot(now + 2, 789); + + let v = points.points.try_iter().collect::>(); + + assert_eq!( + format!("{:?}", v), + "[(1000000001, 45), (999999999, 456), (1000000002, 123)]" + ); + + assert_eq!(points.value.load(SeqCst), 789); + Ok(()) +} + +#[test] +fn test_should_flush() -> Result<()> { + let global_count = AtomicUsize::new(0); + for _i in 0..1023 { + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert!(!query_profile); + } + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert!(query_profile); + assert_eq!(global_count.load(SeqCst), 0); + for _i in 0..1023 { + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert!(!query_profile); + } + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert!(query_profile); + assert_eq!(global_count.load(SeqCst), 0); + + Ok(()) +} diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index f75d2d3def2cc..b3b8db02b55d0 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -108,6 +108,7 @@ fn env_filter(level: &str) -> EnvFilter { .filter(Some("databend::log::query"), LevelFilter::Off) .filter(Some("databend::log::profile"), LevelFilter::Off) .filter(Some("databend::log::structlog"), LevelFilter::Off) + .filter(Some("databend::log::time_series"), LevelFilter::Off) .parse(level), ) } diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index 372180ff52b43..6313f261f4dab 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::QueryTimeSeriesProfile; +use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -234,10 +236,18 @@ impl OutputPort { ProfileStatisticsName::OutputRows, data_block.num_rows(), ); + QueryTimeSeriesProfile::record_time_series_profile( + TimeSeriesProfileName::OutputRows, + data_block.num_rows(), + ); Profile::record_usize_profile( ProfileStatisticsName::OutputBytes, data_block.memory_size(), ); + QueryTimeSeriesProfile::record_time_series_profile( + TimeSeriesProfileName::OutputBytes, + data_block.memory_size(), + ); } } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index c7b2c27272aeb..b0d2badef785f 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -28,7 +28,9 @@ use databend_common_base::base::WatchNotify; use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::QueryTimeSeriesProfileBuilder; use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TimeSeriesProfiles; use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; @@ -98,6 +100,7 @@ impl Node { processor: &ProcessorPtr, inputs_port: &[Arc], outputs_port: &[Arc], + time_series_profile: Option>, ) -> Arc { let p_name = unsafe { processor.name() }; let tracking_payload = { @@ -124,6 +127,8 @@ impl Node { // Node tracking metrics tracking_payload.metrics = scope.as_ref().map(|x| x.metrics_registry.clone()); + tracking_payload.local_time_series_profile = time_series_profile; + tracking_payload }; @@ -185,7 +190,9 @@ impl ExecutingGraph { finish_condvar_notify: Option, Condvar)>>, ) -> Result { let mut graph = StableGraph::new(); - Self::init_graph(&mut pipeline, &mut graph); + let mut time_series_profile_builder = + QueryTimeSeriesProfileBuilder::new(query_id.to_string()); + Self::init_graph(&mut pipeline, &mut graph, &mut time_series_profile_builder); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), @@ -206,9 +213,10 @@ impl ExecutingGraph { finish_condvar_notify: Option, Condvar)>>, ) -> Result { let mut graph = StableGraph::new(); - + let mut time_series_profile_builder = + QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipeline in &mut pipelines { - Self::init_graph(pipeline, &mut graph); + Self::init_graph(pipeline, &mut graph, &mut time_series_profile_builder); } Ok(ExecutingGraph { @@ -224,7 +232,11 @@ impl ExecutingGraph { }) } - fn init_graph(pipeline: &mut Pipeline, graph: &mut StableGraph, EdgeInfo>) { + fn init_graph( + pipeline: &mut Pipeline, + graph: &mut StableGraph, EdgeInfo>, + time_series_profile_builder: &mut QueryTimeSeriesProfileBuilder, + ) { #[derive(Debug)] struct Edge { source_port: usize, @@ -245,12 +257,21 @@ impl ExecutingGraph { for item in &pipe.items { let pid = graph.node_count(); + let time_series_profile = if let Some(scope) = pipe.scope.as_ref() { + let plan_id = scope.id; + let time_series_profile = + time_series_profile_builder.register_time_series_profile(plan_id); + Some(time_series_profile) + } else { + None + }; let node = Node::create( pid, pipe.scope.clone(), &item.processor, &item.inputs_port, &item.outputs_port, + time_series_profile, ); let graph_node_index = graph.add_node(node.clone()); @@ -278,6 +299,19 @@ impl ExecutingGraph { pipes_edges.push(pipe_edges); } + let query_time_series = Arc::new(time_series_profile_builder.build()); + let node_indices: Vec<_> = graph.node_indices().collect(); + for node_index in node_indices { + // we are sure that the node is only have one reference in the graph + let mut_node = Arc::get_mut(&mut graph[node_index]); + debug_assert!( + mut_node.is_some(), + "ExecutorGraph's node should only have one reference" + ); + if let Some(mut_node) = mut_node { + mut_node.tracking_payload.time_series_profile = Some(query_time_series.clone()); + } + } // The last pipe cannot contain any output edge. assert!(pipes_edges.last().map(|x| x.is_empty()).unwrap_or_default());