From 2ae2c1cf2f040805a78fb6b304001532b3208d1a Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Mon, 28 Apr 2025 14:09:27 +0800 Subject: [PATCH 01/15] feat: add time series statistics profile --- Cargo.lock | 2 + src/common/base/Cargo.toml | 2 + src/common/base/src/runtime/mod.rs | 8 + .../base/src/runtime/runtime_tracker.rs | 3 + .../base/src/runtime/time_series/mod.rs | 24 +++ .../base/src/runtime/time_series/profile.rs | 177 ++++++++++++++++++ .../src/runtime/time_series/query_profile.rs | 122 ++++++++++++ src/common/base/tests/it/main.rs | 1 + src/common/base/tests/it/time_series/mod.rs | 15 ++ .../base/tests/it/time_series/profile.rs | 84 +++++++++ .../pipeline/core/src/processors/port.rs | 10 + .../src/pipelines/executor/executor_graph.rs | 39 +++- 12 files changed, 484 insertions(+), 3 deletions(-) create mode 100644 src/common/base/src/runtime/time_series/mod.rs create mode 100644 src/common/base/src/runtime/time_series/profile.rs create mode 100644 src/common/base/src/runtime/time_series/query_profile.rs create mode 100644 src/common/base/tests/it/time_series/mod.rs create mode 100644 src/common/base/tests/it/time_series/profile.rs diff --git a/Cargo.lock b/Cargo.lock index 50049c413c200..8c244b2cc3559 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2939,6 +2939,8 @@ dependencies = [ "bytemuck", "bytes", "bytesize", + "chrono", + "concurrent-queue", "crc32fast", "ctrlc", "databend-common-exception", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 26547eb91fee1..303ee144f82b5 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -25,6 +25,8 @@ borsh = { workspace = true } bytemuck = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } +chrono = { workspace = true } +concurrent-queue = { workspace = true } crc32fast = { workspace = true } ctrlc = { workspace = true } enquote = { workspace = true } diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index d809ac2792525..20760ab1b37b4 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -24,6 +24,7 @@ pub mod profile; mod runtime; mod runtime_tracker; mod thread; +mod time_series; pub use backtrace::dump_backtrace; pub use backtrace::get_all_tasks; @@ -60,3 +61,10 @@ pub use runtime_tracker::TrackingPayload; pub use runtime_tracker::UnlimitedFuture; pub use thread::Thread; pub use thread::ThreadJoinHandle; +pub use time_series::compress_time_point; +pub use time_series::get_time_series_profile_desc; +pub use time_series::QueryTimeSeriesProfile; +pub use time_series::QueryTimeSeriesProfileBuilder; +pub use time_series::TimeSeriesProfileDesc; +pub use time_series::TimeSeriesProfileName; +pub use time_series::TimeSeriesProfiles; diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index f71fe9ca2cfbd..00c15bfbe9819 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -55,6 +55,7 @@ use crate::runtime::memory::GlobalStatBuffer; use crate::runtime::memory::MemStat; use crate::runtime::metrics::ScopedRegistry; use crate::runtime::profile::Profile; +use crate::runtime::time_series::QueryTimeSeriesProfile; use crate::runtime::MemStatBuffer; // For implemented and needs to call drop, we cannot use the attribute tag thread local. @@ -104,6 +105,7 @@ pub struct TrackingPayload { pub mem_stat: Option>, pub metrics: Option>, pub should_log: bool, + pub time_series_profile: Option<(u32, Arc)>, } pub struct TrackingGuard { @@ -165,6 +167,7 @@ impl ThreadTracker { mem_stat: None, query_id: None, should_log: true, + time_series_profile: None, }, } } diff --git a/src/common/base/src/runtime/time_series/mod.rs b/src/common/base/src/runtime/time_series/mod.rs new file mode 100644 index 0000000000000..ac7d2929809ad --- /dev/null +++ b/src/common/base/src/runtime/time_series/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod profile; +mod query_profile; + +pub use profile::compress_time_point; +pub use profile::get_time_series_profile_desc; +pub use profile::TimeSeriesProfileDesc; +pub use profile::TimeSeriesProfileName; +pub use profile::TimeSeriesProfiles; +pub use query_profile::QueryTimeSeriesProfile; +pub use query_profile::QueryTimeSeriesProfileBuilder; diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs new file mode 100644 index 0000000000000..7a3278e2f0242 --- /dev/null +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -0,0 +1,177 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +use concurrent_queue::ConcurrentQueue; +use once_cell::sync::OnceCell; +use serde::Serialize; + +// 1 second in milliseconds +const DEFAULT_INTERVAL: usize = 1000; + +// DataPoint is a tuple of (timestamp, value) +type DataPoint = (usize, usize); + +pub struct ProfilePoints { + pub points: ConcurrentQueue, + pub value: AtomicUsize, + pub last_record_timestamp: AtomicUsize, +} + +pub struct TimeSeriesProfiles { + pub profiles: Vec, +} + +pub enum TimeSeriesProfileName { + OutputRows, + OutputBytes, +} + +#[derive(Serialize)] +pub struct TimeSeriesProfileDesc { + name: &'static str, + index: u32, +} +pub static TIME_SERIES_PROFILES_DESC: OnceCell>> = OnceCell::new(); + +pub fn get_time_series_profile_desc() -> Arc> { + TIME_SERIES_PROFILES_DESC + .get_or_init(|| { + Arc::new(vec![ + TimeSeriesProfileDesc { + name: "OutputRows", + index: TimeSeriesProfileName::OutputRows as u32, + }, + TimeSeriesProfileDesc { + name: "OutputBytes", + index: TimeSeriesProfileName::OutputBytes as u32, + }, + ]) + }) + .clone() +} + +impl TimeSeriesProfiles { + pub fn new() -> Self { + let type_num = mem::variant_count::(); + TimeSeriesProfiles { + profiles: Self::create_profiles(type_num), + } + } + + fn create_profiles(type_num: usize) -> Vec { + let mut profiles = Vec::with_capacity(type_num); + for _ in 0..type_num { + profiles.push(ProfilePoints { + points: ConcurrentQueue::unbounded(), + last_record_timestamp: AtomicUsize::new(0), + value: AtomicUsize::new(0), + }); + } + profiles + } + + pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool { + let profile = &self.profiles[name as usize]; + let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL; + let mut current_last_record = now; + let mut is_record = false; + loop { + match profile.last_record_timestamp.compare_exchange_weak( + current_last_record, + now, + SeqCst, + SeqCst, + ) { + Ok(_) => { + if current_last_record == 0 { + // the first time, we will record it in next time slot + break; + } + if now == current_last_record { + // still in the same slot + break; + } + let last_value = profile.value.swap(0, SeqCst); + let _ = profile.points.push((current_last_record, last_value)); + is_record = true; + break; + } + Err(last_record) => { + current_last_record = last_record; + } + } + } + profile.value.fetch_add(value, SeqCst); + is_record + } + + pub fn flush(&self, finish: bool) -> Vec<(u32, Vec>)> { + let mut batch = Vec::with_capacity(self.profiles.len()); + for (profile_name, profile) in self.profiles.iter().enumerate() { + if finish { + // if flush called by finish, we need to flush the last record + let last_value = profile.value.swap(0, SeqCst); + let _ = profile + .points + .push((profile.last_record_timestamp.load(SeqCst), last_value)); + } + let points = Vec::from_iter(profile.points.try_iter()); + batch.push((profile_name as u32, compress_time_point(&points))); + } + batch + } +} + +impl Default for TimeSeriesProfiles { + fn default() -> Self { + Self::new() + } +} + +/// Compresses a sequence of (`Vec`, i.e., a list of (timestamp, value)) +/// into a more compact format: `Vec>`. +/// +/// Compressed format description: +/// - Each `Vec` represents a segment of consecutive timestamps. +/// - The first element of each `Vec` is the starting timestamp (start_time) of the segment. +/// - The following elements are the values corresponding to each consecutive timestamp in that segment. +/// +/// Example: +/// given the original data: +/// `[(1744971865,100), (1744971866,200), (1744971867,50), (1744971868,150), (1744971870,20), (1744971871,40)]` +/// the compressed result will be: +/// `[[1744971865, 100, 200, 50, 150], [1744971870, 20, 40]]` +pub fn compress_time_point(points: &[DataPoint]) -> Vec> { + let mut result = Vec::new(); + let mut i = 0; + while i < points.len() { + let (start_time, value) = points[i]; + let mut group = Vec::new(); + group.push(start_time); + group.push(value); + let mut j = i + 1; + while j < points.len() && points[j].0 == points[j - 1].0 + 1 { + group.push(points[j].1); + j += 1; + } + result.push(group); + i = j; + } + result +} diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs new file mode 100644 index 0000000000000..b263f99dda02c --- /dev/null +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; + +use log::info; +use serde::Serialize; + +use crate::runtime::get_time_series_profile_desc; +use crate::runtime::time_series::profile::TimeSeriesProfileName; +use crate::runtime::time_series::profile::TimeSeriesProfiles; +use crate::runtime::ThreadTracker; +use crate::runtime::TimeSeriesProfileDesc; + +const DEFAULT_BATCH_SIZE: usize = 1024; + +pub struct QueryTimeSeriesProfile { + pub global_count: AtomicUsize, + pub plans_profiles: Vec>, + pub query_id: String, +} + +impl QueryTimeSeriesProfile { + pub fn record_time_series_profile(name: TimeSeriesProfileName, value: usize) { + ThreadTracker::with(|x| match x.borrow().payload.time_series_profile.as_ref() { + None => {} + Some((plan_id, profile)) => { + if let Some(p) = profile.plans_profiles.get(*plan_id as usize) { + if p.record(name, value) + && profile.global_count.fetch_add(1, SeqCst) == DEFAULT_BATCH_SIZE - 1 + { + profile.flush(false); + } + } + } + }) + } + + pub fn flush(&self, finish: bool) { + #[derive(Serialize)] + struct QueryTimeSeries { + query_id: String, + plans: Vec, + desc: Arc>, + } + #[derive(Serialize)] + struct PlanTimeSeries { + plan_id: u32, + data: Vec, + } + #[derive(Serialize)] + struct ProfileTimeSeries(u32, Vec>); + let mut plans = Vec::with_capacity(self.plans_profiles.len()); + for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() { + let profile_time_series_vec: Vec = plan_profile + .flush(finish) + .into_iter() + .map(|(id, points)| ProfileTimeSeries(id, points)) + .collect(); + plans.push(PlanTimeSeries { + plan_id: plan_id as u32, + data: profile_time_series_vec, + }); + } + let query_time_series = QueryTimeSeries { + query_id: self.query_id.clone(), + plans, + desc: get_time_series_profile_desc(), + }; + let json = serde_json::to_string(&query_time_series).unwrap(); + info!(target: "databend::log::time_series", "{}", json); + } +} + +impl Drop for QueryTimeSeriesProfile { + fn drop(&mut self) { + self.flush(true); + } +} + +pub struct QueryTimeSeriesProfileBuilder { + plans_profile: BTreeMap>, + query_id: String, +} + +impl QueryTimeSeriesProfileBuilder { + pub fn new(query_id: String) -> Self { + QueryTimeSeriesProfileBuilder { + plans_profile: BTreeMap::new(), + query_id, + } + } + + pub fn register_time_series_profile(&mut self, plan_id: u32) { + if !self.plans_profile.contains_key(&plan_id) { + self.plans_profile + .insert(plan_id, Arc::new(TimeSeriesProfiles::new())); + } + } + + pub fn build(self) -> QueryTimeSeriesProfile { + QueryTimeSeriesProfile { + global_count: AtomicUsize::new(0), + plans_profiles: self.plans_profile.into_values().collect(), + query_id: self.query_id, + } + } +} diff --git a/src/common/base/tests/it/main.rs b/src/common/base/tests/it/main.rs index d40345946826f..9ec23a32818be 100644 --- a/src/common/base/tests/it/main.rs +++ b/src/common/base/tests/it/main.rs @@ -24,6 +24,7 @@ mod range_merger; mod runtime; mod stoppable; mod string; +mod time_series; // runtime tests depends on the memory stat collector. #[global_allocator] diff --git a/src/common/base/tests/it/time_series/mod.rs b/src/common/base/tests/it/time_series/mod.rs new file mode 100644 index 0000000000000..df8b6ea489d98 --- /dev/null +++ b/src/common/base/tests/it/time_series/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod profile; diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs new file mode 100644 index 0000000000000..7dc6e78758f9b --- /dev/null +++ b/src/common/base/tests/it/time_series/profile.rs @@ -0,0 +1,84 @@ +use std::sync::atomic::Ordering::SeqCst; +use std::thread; +use std::time::Duration; + +use databend_common_base::runtime::compress_time_point; +use databend_common_base::runtime::TimeSeriesProfileName; +use databend_common_base::runtime::TimeSeriesProfiles; +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use databend_common_exception::Result; + +#[test] +fn test_time_series_profile_record() -> Result<()> { + let profile = TimeSeriesProfiles::new(); + + // 1. For the first record, we need only increase the inner counter + profile.record(TimeSeriesProfileName::OutputBytes, 1000); + let value = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .value + .load(SeqCst); + let len = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .len(); + assert_eq!(value, 1000); + assert_eq!(len, 0); + + thread::sleep(Duration::from_secs(1)); + + // 2. for next time slot, we need to add a new point + // after that, we need to reset the counter + profile.record(TimeSeriesProfileName::OutputBytes, 1234); + let value = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .value + .load(SeqCst); + let len = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .len(); + assert_eq!(value, 1234); + assert_eq!(len, 1); + // the first point should be 1000 + let first = profile.profiles[TimeSeriesProfileName::OutputBytes as usize] + .points + .pop() + .unwrap(); + assert_ne!(first.0, 0); + assert_eq!(first.1, 1000); + + Ok(()) +} +#[test] +fn test_compress_time_point_basic() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + (1744971867, 50), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![vec![1744971865, 100, 200, 50, 150], vec![ + 1744971870, 20, 40, + ]]; + assert_eq!(compress_time_point(&input), expected); +} + +#[test] +fn test_compress_time_point_no_consecutive() { + let input = vec![(1744971865, 10), (1744971867, 20), (1744971869, 30)]; + let expected = vec![vec![1744971865, 10], vec![1744971867, 20], vec![ + 1744971869, 30, + ]]; + assert_eq!(compress_time_point(&input), expected); +} diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index 372180ff52b43..6313f261f4dab 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::QueryTimeSeriesProfile; +use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -234,10 +236,18 @@ impl OutputPort { ProfileStatisticsName::OutputRows, data_block.num_rows(), ); + QueryTimeSeriesProfile::record_time_series_profile( + TimeSeriesProfileName::OutputRows, + data_block.num_rows(), + ); Profile::record_usize_profile( ProfileStatisticsName::OutputBytes, data_block.memory_size(), ); + QueryTimeSeriesProfile::record_time_series_profile( + TimeSeriesProfileName::OutputBytes, + data_block.memory_size(), + ); } } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index c7b2c27272aeb..5031baf13a86f 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -28,6 +28,7 @@ use databend_common_base::base::WatchNotify; use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::QueryTimeSeriesProfileBuilder; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; @@ -185,7 +186,7 @@ impl ExecutingGraph { finish_condvar_notify: Option, Condvar)>>, ) -> Result { let mut graph = StableGraph::new(); - Self::init_graph(&mut pipeline, &mut graph); + Self::init_graph(&mut pipeline, &mut graph, &query_id); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), @@ -208,7 +209,7 @@ impl ExecutingGraph { let mut graph = StableGraph::new(); for pipeline in &mut pipelines { - Self::init_graph(pipeline, &mut graph); + Self::init_graph(pipeline, &mut graph, &query_id); } Ok(ExecutingGraph { @@ -224,7 +225,11 @@ impl ExecutingGraph { }) } - fn init_graph(pipeline: &mut Pipeline, graph: &mut StableGraph, EdgeInfo>) { + fn init_graph( + pipeline: &mut Pipeline, + graph: &mut StableGraph, EdgeInfo>, + query_id: &str, + ) { #[derive(Debug)] struct Edge { source_port: usize, @@ -234,6 +239,7 @@ impl ExecutingGraph { } let mut pipes_edges: Vec> = Vec::new(); + let mut time_series_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipe in &pipeline.pipes { assert_eq!( pipe.input_length, @@ -245,6 +251,10 @@ impl ExecutingGraph { for item in &pipe.items { let pid = graph.node_count(); + if let Some(scope) = pipe.scope.as_ref() { + let plan_id = scope.id; + time_series_builder.register_time_series_profile(plan_id); + } let node = Node::create( pid, pipe.scope.clone(), @@ -278,6 +288,29 @@ impl ExecutingGraph { pipes_edges.push(pipe_edges); } + let query_time_series = Arc::new(time_series_builder.build()); + let node_indices: Vec<_> = graph.node_indices().collect(); + for node_index in node_indices { + let plan_id = { + &graph[node_index] + .tracking_payload + .profile + .as_ref() + .and_then(|x| x.plan_id) + }; + if let Some(plan_id) = plan_id { + // we are sure that the node is only have one reference in the graph + let mut_node = Arc::get_mut(&mut graph[node_index]); + debug_assert!( + mut_node.is_some(), + "ExecutorGraph's node should only have one reference" + ); + if let Some(mut_node) = mut_node { + mut_node.tracking_payload.time_series_profile = + Some((*plan_id, query_time_series.clone())); + } + } + } // The last pipe cannot contain any output edge. assert!(pipes_edges.last().map(|x| x.is_empty()).unwrap_or_default()); From a51c4b98b7fea986e6d9d9ce4b0f59f83ef3756b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Mon, 28 Apr 2025 14:29:56 +0800 Subject: [PATCH 02/15] chore: add quota for log --- src/common/base/src/runtime/time_series/profile.rs | 14 ++++++++++++-- .../base/src/runtime/time_series/query_profile.rs | 6 +++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index 7a3278e2f0242..ed2074712e10c 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -121,9 +121,12 @@ impl TimeSeriesProfiles { is_record } - pub fn flush(&self, finish: bool) -> Vec<(u32, Vec>)> { + pub fn flush(&self, finish: bool, quota: &mut usize) -> Vec<(u32, Vec>)> { let mut batch = Vec::with_capacity(self.profiles.len()); for (profile_name, profile) in self.profiles.iter().enumerate() { + if *quota <= 0 && !finish { + break; + } if finish { // if flush called by finish, we need to flush the last record let last_value = profile.value.swap(0, SeqCst); @@ -131,7 +134,14 @@ impl TimeSeriesProfiles { .points .push((profile.last_record_timestamp.load(SeqCst), last_value)); } - let points = Vec::from_iter(profile.points.try_iter()); + let mut points = Vec::with_capacity(profile.points.len()); + while let Ok(point) = profile.points.pop() { + points.push(point); + *quota -= 1; + if *quota <= 0 && !finish { + break; + } + } batch.push((profile_name as u32, compress_time_point(&points))); } batch diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs index b263f99dda02c..3219aeec5de15 100644 --- a/src/common/base/src/runtime/time_series/query_profile.rs +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -64,10 +64,14 @@ impl QueryTimeSeriesProfile { } #[derive(Serialize)] struct ProfileTimeSeries(u32, Vec>); + let mut quota = DEFAULT_BATCH_SIZE; let mut plans = Vec::with_capacity(self.plans_profiles.len()); for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() { + if quota <= 0 && !finish { + break; + } let profile_time_series_vec: Vec = plan_profile - .flush(finish) + .flush(finish, &mut quota) .into_iter() .map(|(id, points)| ProfileTimeSeries(id, points)) .collect(); From c2d2f4f5b303981f930efca7ec72325ce833cf86 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Mon, 28 Apr 2025 14:45:53 +0800 Subject: [PATCH 03/15] chore: add quota for log --- src/common/base/src/runtime/time_series/profile.rs | 6 +++--- src/common/base/src/runtime/time_series/query_profile.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index ed2074712e10c..3f1cafb02d6e7 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -121,10 +121,10 @@ impl TimeSeriesProfiles { is_record } - pub fn flush(&self, finish: bool, quota: &mut usize) -> Vec<(u32, Vec>)> { + pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec<(u32, Vec>)> { let mut batch = Vec::with_capacity(self.profiles.len()); for (profile_name, profile) in self.profiles.iter().enumerate() { - if *quota <= 0 && !finish { + if *quota == 0 && !finish { break; } if finish { @@ -138,7 +138,7 @@ impl TimeSeriesProfiles { while let Ok(point) = profile.points.pop() { points.push(point); *quota -= 1; - if *quota <= 0 && !finish { + if *quota == 0 && !finish { break; } } diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs index 3219aeec5de15..470dff6633015 100644 --- a/src/common/base/src/runtime/time_series/query_profile.rs +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -64,10 +64,10 @@ impl QueryTimeSeriesProfile { } #[derive(Serialize)] struct ProfileTimeSeries(u32, Vec>); - let mut quota = DEFAULT_BATCH_SIZE; + let mut quota = DEFAULT_BATCH_SIZE as i32; let mut plans = Vec::with_capacity(self.plans_profiles.len()); for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() { - if quota <= 0 && !finish { + if quota == 0 && !finish { break; } let profile_time_series_vec: Vec = plan_profile From e9d5d7d437ab3f7b4136c62c9f82a21985afd70e Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 10:33:09 +0800 Subject: [PATCH 04/15] chore: add more tests --- .../base/tests/it/time_series/profile.rs | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index 7dc6e78758f9b..a9c7e01128e91 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -82,3 +82,25 @@ fn test_compress_time_point_no_consecutive() { ]]; assert_eq!(compress_time_point(&input), expected); } + +#[test] +fn test_finish_flush() { + let profile = TimeSeriesProfiles::new(); + + profile.record(TimeSeriesProfileName::OutputBytes, 2000); + profile.record(TimeSeriesProfileName::OutputRows, 1000); + + thread::sleep(Duration::from_secs(1)); + + // Finish flush will read this from `profile.value` and append it to the points + profile.record(TimeSeriesProfileName::OutputBytes, 2); + profile.record(TimeSeriesProfileName::OutputRows, 1); + + let batch = profile.flush(true, &mut 4); + + assert_eq!(batch.len(), 2); + // [[timestamp, 1000, 1]] + assert_eq!(batch[0].1[0].len(), 3); + // [[timestamp, 2000, 2]] + assert_eq!(batch[1].1[0].len(), 3); +} From 89f9dd6006daefcdc5b02800af955c8c9ba0b645 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 10:39:28 +0800 Subject: [PATCH 05/15] chore: disable in default --- src/common/tracing/src/init.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index f75d2d3def2cc..b3b8db02b55d0 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -108,6 +108,7 @@ fn env_filter(level: &str) -> EnvFilter { .filter(Some("databend::log::query"), LevelFilter::Off) .filter(Some("databend::log::profile"), LevelFilter::Off) .filter(Some("databend::log::structlog"), LevelFilter::Off) + .filter(Some("databend::log::time_series"), LevelFilter::Off) .parse(level), ) } From 5154cfe5e64383a47816138de3cf2aa235132da0 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 17:25:12 +0800 Subject: [PATCH 06/15] fix: concurrent situation caused problem --- src/common/base/src/runtime/mod.rs | 1 + .../base/src/runtime/time_series/mod.rs | 1 + .../base/src/runtime/time_series/profile.rs | 120 ++++++++++++------ .../base/tests/it/time_series/profile.rs | 67 ++++++++++ 4 files changed, 147 insertions(+), 42 deletions(-) diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index 20760ab1b37b4..3776957ab6ecd 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -63,6 +63,7 @@ pub use thread::Thread; pub use thread::ThreadJoinHandle; pub use time_series::compress_time_point; pub use time_series::get_time_series_profile_desc; +pub use time_series::ProfilePoints; pub use time_series::QueryTimeSeriesProfile; pub use time_series::QueryTimeSeriesProfileBuilder; pub use time_series::TimeSeriesProfileDesc; diff --git a/src/common/base/src/runtime/time_series/mod.rs b/src/common/base/src/runtime/time_series/mod.rs index ac7d2929809ad..f7d2c677ce2a9 100644 --- a/src/common/base/src/runtime/time_series/mod.rs +++ b/src/common/base/src/runtime/time_series/mod.rs @@ -18,6 +18,7 @@ mod query_profile; pub use profile::compress_time_point; pub use profile::get_time_series_profile_desc; pub use profile::TimeSeriesProfileDesc; +pub use profile::ProfilePoints; pub use profile::TimeSeriesProfileName; pub use profile::TimeSeriesProfiles; pub use query_profile::QueryTimeSeriesProfile; diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index 3f1cafb02d6e7..931267486028d 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -27,16 +27,6 @@ const DEFAULT_INTERVAL: usize = 1000; // DataPoint is a tuple of (timestamp, value) type DataPoint = (usize, usize); -pub struct ProfilePoints { - pub points: ConcurrentQueue, - pub value: AtomicUsize, - pub last_record_timestamp: AtomicUsize, -} - -pub struct TimeSeriesProfiles { - pub profiles: Vec, -} - pub enum TimeSeriesProfileName { OutputRows, OutputBytes, @@ -66,58 +56,98 @@ pub fn get_time_series_profile_desc() -> Arc> { .clone() } -impl TimeSeriesProfiles { - pub fn new() -> Self { - let type_num = mem::variant_count::(); - TimeSeriesProfiles { - profiles: Self::create_profiles(type_num), - } - } +pub struct ProfilePoints { + pub points: ConcurrentQueue, + pub value: AtomicUsize, + pub last_check_timestamp: AtomicUsize, +} - fn create_profiles(type_num: usize) -> Vec { - let mut profiles = Vec::with_capacity(type_num); - for _ in 0..type_num { - profiles.push(ProfilePoints { - points: ConcurrentQueue::unbounded(), - last_record_timestamp: AtomicUsize::new(0), - value: AtomicUsize::new(0), - }); +pub struct TimeSeriesProfiles { + pub profiles: Vec, +} + +impl ProfilePoints { + pub fn new() -> Self { + ProfilePoints { + points: ConcurrentQueue::unbounded(), + last_check_timestamp: AtomicUsize::new(0), + value: AtomicUsize::new(0), } - profiles } - - pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool { - let profile = &self.profiles[name as usize]; - let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL; - let mut current_last_record = now; + pub fn record_time_slot(&self, now: usize, value: usize) -> bool { let mut is_record = false; + let mut current_last_check = 0; loop { - match profile.last_record_timestamp.compare_exchange_weak( - current_last_record, + match self.last_check_timestamp.compare_exchange_weak( + current_last_check, now, SeqCst, SeqCst, ) { Ok(_) => { - if current_last_record == 0 { + if current_last_check == 0 { // the first time, we will record it in next time slot break; } - if now == current_last_record { + if now == current_last_check { // still in the same slot break; } - let last_value = profile.value.swap(0, SeqCst); - let _ = profile.points.push((current_last_record, last_value)); + let last_value = self.value.swap(0, SeqCst); + let _ = self.points.push((current_last_check, last_value)); is_record = true; break; } Err(last_record) => { - current_last_record = last_record; + if now < last_record { + // for concurrent situation, now could be earlier than last_record + // that means we are missing the time slot, it is already push into + // the points queue. We just need to push a same timestamp tuple + // we will merge them in the flush + if now == last_record - 1 { + let _ = self.points.push((now, value)); + // should avoid adding value into this time slot + return true; + } else { + // missing the time slot for 2 seconds or more, it cannot happen + return false; + } + } + current_last_check = last_record; } } } - profile.value.fetch_add(value, SeqCst); + self.value.fetch_add(value, SeqCst); + is_record + } +} + +impl Default for ProfilePoints { + fn default() -> Self { + Self::new() + } +} + +impl TimeSeriesProfiles { + pub fn new() -> Self { + let type_num = mem::variant_count::(); + TimeSeriesProfiles { + profiles: Self::create_profiles(type_num), + } + } + + fn create_profiles(type_num: usize) -> Vec { + let mut profiles = Vec::with_capacity(type_num); + for _ in 0..type_num { + profiles.push(ProfilePoints::new()); + } + profiles + } + + pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool { + let profile = &self.profiles[name as usize]; + let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL; + let is_record = profile.record_time_slot(now, value); is_record } @@ -132,7 +162,7 @@ impl TimeSeriesProfiles { let last_value = profile.value.swap(0, SeqCst); let _ = profile .points - .push((profile.last_record_timestamp.load(SeqCst), last_value)); + .push((profile.last_check_timestamp.load(SeqCst), last_value)); } let mut points = Vec::with_capacity(profile.points.len()); while let Ok(point) = profile.points.pop() { @@ -176,8 +206,14 @@ pub fn compress_time_point(points: &[DataPoint]) -> Vec> { group.push(start_time); group.push(value); let mut j = i + 1; - while j < points.len() && points[j].0 == points[j - 1].0 + 1 { - group.push(points[j].1); + while j < points.len() + && (points[j].0 == points[j - 1].0 + 1 || points[j].0 == points[j - 1].0) + { + let mut v = points[j].1; + if points[j].0 == points[j - 1].0 { + v += group.pop().unwrap(); + } + group.push(v); j += 1; } result.push(group); diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index a9c7e01128e91..1e0f2977f6a28 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -3,6 +3,7 @@ use std::thread; use std::time::Duration; use databend_common_base::runtime::compress_time_point; +use databend_common_base::runtime::ProfilePoints; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_base::runtime::TimeSeriesProfiles; // Copyright 2021 Datafuse Labs @@ -74,6 +75,23 @@ fn test_compress_time_point_basic() { assert_eq!(compress_time_point(&input), expected); } +#[test] +fn test_compress_time_point_special_duplicate() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + (1744971867, 50), + (1744971867, 123), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![vec![1744971865, 100, 200, 50 + 123, 150], vec![ + 1744971870, 20, 40, + ]]; + assert_eq!(compress_time_point(&input), expected); +} + #[test] fn test_compress_time_point_no_consecutive() { let input = vec![(1744971865, 10), (1744971867, 20), (1744971869, 30)]; @@ -104,3 +122,52 @@ fn test_finish_flush() { // [[timestamp, 2000, 2]] assert_eq!(batch[1].1[0].len(), 3); } + +#[test] +fn test_record_inner_basic() -> Result<()> { + let points = ProfilePoints::new(); + let now = chrono::Utc::now().timestamp() as usize; + + // Simulate recording in the same time slot + for i in 0..10 { + points.record_time_slot(now, i); + } + assert_eq!(points.points.len(), 0); + assert_eq!(points.value.load(SeqCst), (0..10).sum::()); + + // Next time slot + for i in 0..100 { + points.record_time_slot(now + 1, i); + } + assert_eq!(points.points.len(), 1); + let x = points.points.pop().unwrap(); + assert_eq!(x.0, now); + assert_eq!(x.1, (0..10).sum::()); + assert_eq!(points.value.load(SeqCst), (0..100).sum::()); + Ok(()) +} + +#[test] +fn test_record_inner_special() -> Result<()> { + // Simulate concurrently recording but one thread is late + + let points = ProfilePoints::new(); + let now = 1000000001 as usize; + for i in 0..10 { + points.record_time_slot(now, i); + } + points.record_time_slot(now + 1, 123); + points.record_time_slot(now, 456); + points.record_time_slot(now + 2, 789); + + let v = points.points.try_iter().collect::>(); + + assert_eq!( + format!("{:?}", v), + "[(1000000001, 45), (1000000001, 456), (1000000002, 123)]" + ); + + assert_eq!(points.value.load(SeqCst), 789); + + Ok(()) +} From 527059cdf3158114b638651bf44a1272311d67f2 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 18:30:34 +0800 Subject: [PATCH 07/15] fix: concurrent situation caused problem --- .../base/src/runtime/runtime_tracker.rs | 5 +- .../base/src/runtime/time_series/profile.rs | 29 ++++++----- .../src/runtime/time_series/query_profile.rs | 46 +++++++++-------- .../base/tests/it/time_series/profile.rs | 51 +++++++++++++++++-- .../src/pipelines/executor/executor_graph.rs | 41 ++++++++------- 5 files changed, 111 insertions(+), 61 deletions(-) diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 00c15bfbe9819..a1552bfed6457 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -57,6 +57,7 @@ use crate::runtime::metrics::ScopedRegistry; use crate::runtime::profile::Profile; use crate::runtime::time_series::QueryTimeSeriesProfile; use crate::runtime::MemStatBuffer; +use crate::runtime::TimeSeriesProfiles; // For implemented and needs to call drop, we cannot use the attribute tag thread local. // https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=ea33533387d401e86423df1a764b5609 @@ -105,7 +106,8 @@ pub struct TrackingPayload { pub mem_stat: Option>, pub metrics: Option>, pub should_log: bool, - pub time_series_profile: Option<(u32, Arc)>, + pub time_series_profile: Option>, + pub local_time_series_profile: Option>, } pub struct TrackingGuard { @@ -168,6 +170,7 @@ impl ThreadTracker { query_id: None, should_log: true, time_series_profile: None, + local_time_series_profile: None, }, } } diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index 931267486028d..14e2b066e4ce3 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -100,18 +100,13 @@ impl ProfilePoints { } Err(last_record) => { if now < last_record { - // for concurrent situation, now could be earlier than last_record + // for concurrent situation, `now` could be earlier than `last_record` // that means we are missing the time slot, it is already push into - // the points queue. We just need to push a same timestamp tuple - // we will merge them in the flush - if now == last_record - 1 { - let _ = self.points.push((now, value)); - // should avoid adding value into this time slot - return true; - } else { - // missing the time slot for 2 seconds or more, it cannot happen - return false; - } + // the points queue. We just need to push the value into the queue again. + // will merge them in the flush + let _ = self.points.push((now, value)); + // early return, should avoid adding value into this time slot + return true; } current_last_check = last_record; } @@ -151,9 +146,9 @@ impl TimeSeriesProfiles { is_record } - pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec<(u32, Vec>)> { + pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec>> { let mut batch = Vec::with_capacity(self.profiles.len()); - for (profile_name, profile) in self.profiles.iter().enumerate() { + for profile in self.profiles.iter() { if *quota == 0 && !finish { break; } @@ -172,7 +167,7 @@ impl TimeSeriesProfiles { break; } } - batch.push((profile_name as u32, compress_time_point(&points))); + batch.push(compress_time_point(&points)); } batch } @@ -197,6 +192,12 @@ impl Default for TimeSeriesProfiles { /// `[(1744971865,100), (1744971866,200), (1744971867,50), (1744971868,150), (1744971870,20), (1744971871,40)]` /// the compressed result will be: /// `[[1744971865, 100, 200, 50, 150], [1744971870, 20, 40]]` +/// +/// Note: +/// Why convert to `[timestamp, value0, value1, value2]` instead of `[timestamp, (value0, value1, value2)]`: +/// Rust serde_json will convert a tuple to a list. [timestamp, (value0, value1, value2)] will be converted to +/// `[timestamp, value0, value1, value2]` after serialization. +/// See: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=3c153dfcfdde3032c80c05f4010f3d0f pub fn compress_time_point(points: &[DataPoint]) -> Vec> { let mut result = Vec::new(); let mut i = 0; diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs index 470dff6633015..0452b45372124 100644 --- a/src/common/base/src/runtime/time_series/query_profile.rs +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -36,18 +36,25 @@ pub struct QueryTimeSeriesProfile { impl QueryTimeSeriesProfile { pub fn record_time_series_profile(name: TimeSeriesProfileName, value: usize) { - ThreadTracker::with(|x| match x.borrow().payload.time_series_profile.as_ref() { - None => {} - Some((plan_id, profile)) => { - if let Some(p) = profile.plans_profiles.get(*plan_id as usize) { - if p.record(name, value) - && profile.global_count.fetch_add(1, SeqCst) == DEFAULT_BATCH_SIZE - 1 - { - profile.flush(false); + ThreadTracker::with( + |x| match x.borrow().payload.local_time_series_profile.as_ref() { + None => {} + Some(profile) => { + if profile.record(name, value) { + if let Some(global_profile) = + x.borrow().payload.time_series_profile.as_ref() + { + if global_profile.global_count.fetch_add(1, SeqCst) + == DEFAULT_BATCH_SIZE - 1 + { + // flush the global profile + global_profile.flush(false); + } + } } } - } - }) + }, + ) } pub fn flush(&self, finish: bool) { @@ -60,21 +67,15 @@ impl QueryTimeSeriesProfile { #[derive(Serialize)] struct PlanTimeSeries { plan_id: u32, - data: Vec, + data: Vec>>, } - #[derive(Serialize)] - struct ProfileTimeSeries(u32, Vec>); let mut quota = DEFAULT_BATCH_SIZE as i32; let mut plans = Vec::with_capacity(self.plans_profiles.len()); for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() { if quota == 0 && !finish { break; } - let profile_time_series_vec: Vec = plan_profile - .flush(finish, &mut quota) - .into_iter() - .map(|(id, points)| ProfileTimeSeries(id, points)) - .collect(); + let profile_time_series_vec = plan_profile.flush(finish, &mut quota); plans.push(PlanTimeSeries { plan_id: plan_id as u32, data: profile_time_series_vec, @@ -109,10 +110,13 @@ impl QueryTimeSeriesProfileBuilder { } } - pub fn register_time_series_profile(&mut self, plan_id: u32) { + pub fn register_time_series_profile(&mut self, plan_id: u32) -> Arc { if !self.plans_profile.contains_key(&plan_id) { - self.plans_profile - .insert(plan_id, Arc::new(TimeSeriesProfiles::new())); + let profile = Arc::new(TimeSeriesProfiles::new()); + self.plans_profile.insert(plan_id, profile.clone()); + profile + } else { + self.plans_profile.get(&plan_id).unwrap().clone() } } diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index 1e0f2977f6a28..6468b185e9f74 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -76,10 +76,11 @@ fn test_compress_time_point_basic() { } #[test] -fn test_compress_time_point_special_duplicate() { +fn test_compress_time_point_special_duplicate_merge() { let input = vec![ (1744971865, 100), (1744971866, 200), + // same timestamp, we will merge (1744971867, 50), (1744971867, 123), (1744971868, 150), @@ -92,6 +93,27 @@ fn test_compress_time_point_special_duplicate() { assert_eq!(compress_time_point(&input), expected); } +#[test] +fn test_compress_time_point_special_invalid() { + let input = vec![ + (1744971865, 100), + (1744971866, 200), + (1744971867, 50), + // extreme case, we not merge it + (1744971865, 123), + (1744971868, 150), + (1744971870, 20), + (1744971871, 40), + ]; + let expected = vec![ + vec![1744971865, 100, 200, 50], + vec![1744971865, 123], + vec![1744971868, 150], + vec![1744971870, 20, 40], + ]; + assert_eq!(compress_time_point(&input), expected); +} + #[test] fn test_compress_time_point_no_consecutive() { let input = vec![(1744971865, 10), (1744971867, 20), (1744971869, 30)]; @@ -118,9 +140,9 @@ fn test_finish_flush() { assert_eq!(batch.len(), 2); // [[timestamp, 1000, 1]] - assert_eq!(batch[0].1[0].len(), 3); + assert_eq!(batch[0][0].len(), 3); // [[timestamp, 2000, 2]] - assert_eq!(batch[1].1[0].len(), 3); + assert_eq!(batch[1][0].len(), 3); } #[test] @@ -150,7 +172,6 @@ fn test_record_inner_basic() -> Result<()> { #[test] fn test_record_inner_special() -> Result<()> { // Simulate concurrently recording but one thread is late - let points = ProfilePoints::new(); let now = 1000000001 as usize; for i in 0..10 { @@ -168,6 +189,28 @@ fn test_record_inner_special() -> Result<()> { ); assert_eq!(points.value.load(SeqCst), 789); + Ok(()) +} +#[test] +fn test_record_inner_special_invalid() -> Result<()> { + // Simulate concurrently recording but one thread is later than 1 second + let points = ProfilePoints::new(); + let now = 1000000001 as usize; + for i in 0..10 { + points.record_time_slot(now, i); + } + points.record_time_slot(now + 1, 123); + points.record_time_slot(now - 2, 456); + points.record_time_slot(now + 2, 789); + + let v = points.points.try_iter().collect::>(); + + assert_eq!( + format!("{:?}", v), + "[(1000000001, 45), (999999999, 456), (1000000002, 123)]" + ); + + assert_eq!(points.value.load(SeqCst), 789); Ok(()) } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 5031baf13a86f..1113bf3ea9c2f 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -30,6 +30,7 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_base::runtime::QueryTimeSeriesProfileBuilder; use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TimeSeriesProfiles; use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; @@ -99,6 +100,7 @@ impl Node { processor: &ProcessorPtr, inputs_port: &[Arc], outputs_port: &[Arc], + time_series_profile: Option>, ) -> Arc { let p_name = unsafe { processor.name() }; let tracking_payload = { @@ -125,6 +127,8 @@ impl Node { // Node tracking metrics tracking_payload.metrics = scope.as_ref().map(|x| x.metrics_registry.clone()); + tracking_payload.local_time_series_profile = time_series_profile; + tracking_payload }; @@ -251,16 +255,21 @@ impl ExecutingGraph { for item in &pipe.items { let pid = graph.node_count(); - if let Some(scope) = pipe.scope.as_ref() { + let time_series_profile = if let Some(scope) = pipe.scope.as_ref() { let plan_id = scope.id; - time_series_builder.register_time_series_profile(plan_id); - } + let time_series_profile = + time_series_builder.register_time_series_profile(plan_id); + Some(time_series_profile) + } else { + None + }; let node = Node::create( pid, pipe.scope.clone(), &item.processor, &item.inputs_port, &item.outputs_port, + time_series_profile, ); let graph_node_index = graph.add_node(node.clone()); @@ -291,24 +300,14 @@ impl ExecutingGraph { let query_time_series = Arc::new(time_series_builder.build()); let node_indices: Vec<_> = graph.node_indices().collect(); for node_index in node_indices { - let plan_id = { - &graph[node_index] - .tracking_payload - .profile - .as_ref() - .and_then(|x| x.plan_id) - }; - if let Some(plan_id) = plan_id { - // we are sure that the node is only have one reference in the graph - let mut_node = Arc::get_mut(&mut graph[node_index]); - debug_assert!( - mut_node.is_some(), - "ExecutorGraph's node should only have one reference" - ); - if let Some(mut_node) = mut_node { - mut_node.tracking_payload.time_series_profile = - Some((*plan_id, query_time_series.clone())); - } + // we are sure that the node is only have one reference in the graph + let mut_node = Arc::get_mut(&mut graph[node_index]); + debug_assert!( + mut_node.is_some(), + "ExecutorGraph's node should only have one reference" + ); + if let Some(mut_node) = mut_node { + mut_node.tracking_payload.time_series_profile = Some(query_time_series.clone()); } } From b34650a97e6e28056f4ab6dbe048c1bcb2c40bca Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 21:18:49 +0800 Subject: [PATCH 08/15] fix: concurrent situation caused problem --- .../base/src/runtime/time_series/profile.rs | 7 ++++--- .../src/runtime/time_series/query_profile.rs | 20 +++++++++++++------ .../src/pipelines/executor/executor_graph.rs | 16 ++++++++------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index 14e2b066e4ce3..da2237682e0bd 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -154,10 +154,11 @@ impl TimeSeriesProfiles { } if finish { // if flush called by finish, we need to flush the last record + let last_timestamp = profile.last_check_timestamp.load(SeqCst); let last_value = profile.value.swap(0, SeqCst); - let _ = profile - .points - .push((profile.last_check_timestamp.load(SeqCst), last_value)); + if last_value != 0 && last_timestamp != 0 { + let _ = profile.points.push((last_timestamp, last_value)); + } } let mut points = Vec::with_capacity(profile.points.len()); while let Ok(point) = profile.points.pop() { diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs index 0452b45372124..a7dd842b0bf85 100644 --- a/src/common/base/src/runtime/time_series/query_profile.rs +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -30,7 +30,7 @@ const DEFAULT_BATCH_SIZE: usize = 1024; pub struct QueryTimeSeriesProfile { pub global_count: AtomicUsize, - pub plans_profiles: Vec>, + pub plans_profiles: Vec<(u32, Arc)>, pub query_id: String, } @@ -47,6 +47,7 @@ impl QueryTimeSeriesProfile { if global_profile.global_count.fetch_add(1, SeqCst) == DEFAULT_BATCH_SIZE - 1 { + global_profile.global_count.store(0, SeqCst); // flush the global profile global_profile.flush(false); } @@ -71,16 +72,19 @@ impl QueryTimeSeriesProfile { } let mut quota = DEFAULT_BATCH_SIZE as i32; let mut plans = Vec::with_capacity(self.plans_profiles.len()); - for (plan_id, plan_profile) in self.plans_profiles.iter().enumerate() { + for (plan_id, plan_profile) in self.plans_profiles.iter() { if quota == 0 && !finish { break; } let profile_time_series_vec = plan_profile.flush(finish, &mut quota); plans.push(PlanTimeSeries { - plan_id: plan_id as u32, + plan_id: *plan_id, data: profile_time_series_vec, }); } + if quota == DEFAULT_BATCH_SIZE as i32 { + return; + } let query_time_series = QueryTimeSeries { query_id: self.query_id.clone(), plans, @@ -120,11 +124,15 @@ impl QueryTimeSeriesProfileBuilder { } } - pub fn build(self) -> QueryTimeSeriesProfile { + pub fn build(&self) -> QueryTimeSeriesProfile { QueryTimeSeriesProfile { global_count: AtomicUsize::new(0), - plans_profiles: self.plans_profile.into_values().collect(), - query_id: self.query_id, + plans_profiles: self + .plans_profile + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect(), + query_id: self.query_id.clone(), } } } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 1113bf3ea9c2f..b0d2badef785f 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -190,7 +190,9 @@ impl ExecutingGraph { finish_condvar_notify: Option, Condvar)>>, ) -> Result { let mut graph = StableGraph::new(); - Self::init_graph(&mut pipeline, &mut graph, &query_id); + let mut time_series_profile_builder = + QueryTimeSeriesProfileBuilder::new(query_id.to_string()); + Self::init_graph(&mut pipeline, &mut graph, &mut time_series_profile_builder); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), @@ -211,9 +213,10 @@ impl ExecutingGraph { finish_condvar_notify: Option, Condvar)>>, ) -> Result { let mut graph = StableGraph::new(); - + let mut time_series_profile_builder = + QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipeline in &mut pipelines { - Self::init_graph(pipeline, &mut graph, &query_id); + Self::init_graph(pipeline, &mut graph, &mut time_series_profile_builder); } Ok(ExecutingGraph { @@ -232,7 +235,7 @@ impl ExecutingGraph { fn init_graph( pipeline: &mut Pipeline, graph: &mut StableGraph, EdgeInfo>, - query_id: &str, + time_series_profile_builder: &mut QueryTimeSeriesProfileBuilder, ) { #[derive(Debug)] struct Edge { @@ -243,7 +246,6 @@ impl ExecutingGraph { } let mut pipes_edges: Vec> = Vec::new(); - let mut time_series_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); for pipe in &pipeline.pipes { assert_eq!( pipe.input_length, @@ -258,7 +260,7 @@ impl ExecutingGraph { let time_series_profile = if let Some(scope) = pipe.scope.as_ref() { let plan_id = scope.id; let time_series_profile = - time_series_builder.register_time_series_profile(plan_id); + time_series_profile_builder.register_time_series_profile(plan_id); Some(time_series_profile) } else { None @@ -297,7 +299,7 @@ impl ExecutingGraph { pipes_edges.push(pipe_edges); } - let query_time_series = Arc::new(time_series_builder.build()); + let query_time_series = Arc::new(time_series_profile_builder.build()); let node_indices: Vec<_> = graph.node_indices().collect(); for node_index in node_indices { // we are sure that the node is only have one reference in the graph From f529a6158222c7b6f91d3eaca55945a8d0b9bb88 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 21:26:24 +0800 Subject: [PATCH 09/15] fix: concurrent situation caused problem --- src/common/base/src/runtime/time_series/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/base/src/runtime/time_series/mod.rs b/src/common/base/src/runtime/time_series/mod.rs index f7d2c677ce2a9..8fe3e1455557b 100644 --- a/src/common/base/src/runtime/time_series/mod.rs +++ b/src/common/base/src/runtime/time_series/mod.rs @@ -17,8 +17,8 @@ mod query_profile; pub use profile::compress_time_point; pub use profile::get_time_series_profile_desc; -pub use profile::TimeSeriesProfileDesc; pub use profile::ProfilePoints; +pub use profile::TimeSeriesProfileDesc; pub use profile::TimeSeriesProfileName; pub use profile::TimeSeriesProfiles; pub use query_profile::QueryTimeSeriesProfile; From 6bf457296ecbeb3490f61ac2bacbc04bc98b0d5f Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 21:32:09 +0800 Subject: [PATCH 10/15] fix: concurrent situation caused problem --- src/common/base/src/runtime/time_series/profile.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/base/src/runtime/time_series/profile.rs b/src/common/base/src/runtime/time_series/profile.rs index da2237682e0bd..d2598afdc083a 100644 --- a/src/common/base/src/runtime/time_series/profile.rs +++ b/src/common/base/src/runtime/time_series/profile.rs @@ -142,8 +142,7 @@ impl TimeSeriesProfiles { pub fn record(&self, name: TimeSeriesProfileName, value: usize) -> bool { let profile = &self.profiles[name as usize]; let now = chrono::Local::now().timestamp_millis() as usize / DEFAULT_INTERVAL; - let is_record = profile.record_time_slot(now, value); - is_record + profile.record_time_slot(now, value) } pub fn flush(&self, finish: bool, quota: &mut i32) -> Vec>> { From b9844ad7a6f07d9e93d8e0012a3320fa3a563b6b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Apr 2025 21:37:02 +0800 Subject: [PATCH 11/15] fix: concurrent situation caused problem --- src/common/base/tests/it/time_series/profile.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index 6468b185e9f74..f44fb7cc28b59 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -173,7 +173,7 @@ fn test_record_inner_basic() -> Result<()> { fn test_record_inner_special() -> Result<()> { // Simulate concurrently recording but one thread is late let points = ProfilePoints::new(); - let now = 1000000001 as usize; + let now = 1000000001_usize; for i in 0..10 { points.record_time_slot(now, i); } @@ -196,7 +196,7 @@ fn test_record_inner_special() -> Result<()> { fn test_record_inner_special_invalid() -> Result<()> { // Simulate concurrently recording but one thread is later than 1 second let points = ProfilePoints::new(); - let now = 1000000001 as usize; + let now = 1000000001_usize; for i in 0..10 { points.record_time_slot(now, i); } From 81a1e966d9c9862ca34bf62e1090ff793d548aa8 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 30 Apr 2025 10:55:18 +0800 Subject: [PATCH 12/15] fix: not atomic op --- .../src/runtime/time_series/query_profile.rs | 29 +++++++++++++++---- .../base/tests/it/time_series/profile.rs | 23 +++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/common/base/src/runtime/time_series/query_profile.rs b/src/common/base/src/runtime/time_series/query_profile.rs index a7dd842b0bf85..a47a34e13e2d2 100644 --- a/src/common/base/src/runtime/time_series/query_profile.rs +++ b/src/common/base/src/runtime/time_series/query_profile.rs @@ -44,11 +44,8 @@ impl QueryTimeSeriesProfile { if let Some(global_profile) = x.borrow().payload.time_series_profile.as_ref() { - if global_profile.global_count.fetch_add(1, SeqCst) - == DEFAULT_BATCH_SIZE - 1 - { - global_profile.global_count.store(0, SeqCst); - // flush the global profile + let should_flush = Self::should_flush(&global_profile.global_count); + if should_flush { global_profile.flush(false); } } @@ -58,6 +55,28 @@ impl QueryTimeSeriesProfile { ) } + pub fn should_flush(global_count: &AtomicUsize) -> bool { + let mut prev = 0; + loop { + let next = if prev == DEFAULT_BATCH_SIZE - 1 { + 0 + } else { + prev + 1 + }; + match global_count.compare_exchange_weak(prev, next, SeqCst, SeqCst) { + Ok(_) => { + if next == 0 { + return true; + } + return false; + } + Err(next_prev) => { + prev = next_prev; + } + } + } + } + pub fn flush(&self, finish: bool) { #[derive(Serialize)] struct QueryTimeSeries { diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index f44fb7cc28b59..41c5f0e34df8a 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -1,9 +1,11 @@ +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::thread; use std::time::Duration; use databend_common_base::runtime::compress_time_point; use databend_common_base::runtime::ProfilePoints; +use databend_common_base::runtime::QueryTimeSeriesProfile; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_base::runtime::TimeSeriesProfiles; // Copyright 2021 Datafuse Labs @@ -214,3 +216,24 @@ fn test_record_inner_special_invalid() -> Result<()> { assert_eq!(points.value.load(SeqCst), 789); Ok(()) } + +#[test] +fn test_should_flush() -> Result<()> { + let global_count = AtomicUsize::new(0); + for _i in 0..1023 { + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert_eq!(query_profile, false); + } + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert_eq!(query_profile, true); + assert_eq!(global_count.load(SeqCst), 0); + for _i in 0..1023 { + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert_eq!(query_profile, false); + } + let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); + assert_eq!(query_profile, true); + assert_eq!(global_count.load(SeqCst), 0); + + Ok(()) +} From 543c1482cd2d98235d0201cb1d592f72465e7b43 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 30 Apr 2025 11:03:17 +0800 Subject: [PATCH 13/15] clippy --- src/common/base/tests/it/time_series/profile.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/base/tests/it/time_series/profile.rs b/src/common/base/tests/it/time_series/profile.rs index 41c5f0e34df8a..e2cdaf64e7806 100644 --- a/src/common/base/tests/it/time_series/profile.rs +++ b/src/common/base/tests/it/time_series/profile.rs @@ -222,17 +222,17 @@ fn test_should_flush() -> Result<()> { let global_count = AtomicUsize::new(0); for _i in 0..1023 { let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); - assert_eq!(query_profile, false); + assert!(!query_profile); } let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); - assert_eq!(query_profile, true); + assert!(query_profile); assert_eq!(global_count.load(SeqCst), 0); for _i in 0..1023 { let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); - assert_eq!(query_profile, false); + assert!(!query_profile); } let query_profile = QueryTimeSeriesProfile::should_flush(&global_count); - assert_eq!(query_profile, true); + assert!(query_profile); assert_eq!(global_count.load(SeqCst), 0); Ok(()) From 6c17ab405ccc44c607486a3b704a72141997585a Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 6 May 2025 21:38:44 +0800 Subject: [PATCH 14/15] benchmark --- src/common/tracing/src/init.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index b3b8db02b55d0..f75d2d3def2cc 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -108,7 +108,6 @@ fn env_filter(level: &str) -> EnvFilter { .filter(Some("databend::log::query"), LevelFilter::Off) .filter(Some("databend::log::profile"), LevelFilter::Off) .filter(Some("databend::log::structlog"), LevelFilter::Off) - .filter(Some("databend::log::time_series"), LevelFilter::Off) .parse(level), ) } From 53ccd2d58316722c3661750bc68f199e4ecaf0e2 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 6 May 2025 21:46:07 +0800 Subject: [PATCH 15/15] Revert "benchmark" This reverts commit 6c17ab405ccc44c607486a3b704a72141997585a. --- src/common/tracing/src/init.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index f75d2d3def2cc..b3b8db02b55d0 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -108,6 +108,7 @@ fn env_filter(level: &str) -> EnvFilter { .filter(Some("databend::log::query"), LevelFilter::Off) .filter(Some("databend::log::profile"), LevelFilter::Off) .filter(Some("databend::log::structlog"), LevelFilter::Off) + .filter(Some("databend::log::time_series"), LevelFilter::Off) .parse(level), ) }