Skip to content

Commit 21b36a9

Browse files
committed
chore(executor): add some time tracking log
1 parent cdb2e25 commit 21b36a9

File tree

6 files changed

+91
-11
lines changed

6 files changed

+91
-11
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/src/runtime/defer.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
16+
struct Defer<F: FnOnce() -> R, R>(Option<F>);
17+
18+
impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
19+
fn drop(&mut self) {
20+
self.0.take().unwrap()();
21+
}
22+
}
23+
24+
Defer(Some(f))
25+
}

src/common/base/src/runtime/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod backtrace;
1616
mod catch_unwind;
17+
mod defer;
1718
pub mod error_info;
1819
mod global_runtime;
1920
mod memory;
@@ -30,6 +31,7 @@ pub use backtrace::AsyncTaskItem;
3031
pub use catch_unwind::catch_unwind;
3132
pub use catch_unwind::drop_guard;
3233
pub use catch_unwind::CatchUnwindFuture;
34+
pub use defer::defer;
3335
pub use global_runtime::GlobalIORuntime;
3436
pub use global_runtime::GlobalQueryRuntime;
3537
pub use memory::set_alloc_error_hook;

src/query/pipeline/core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ futures = { workspace = true }
2222
minitrace = { workspace = true }
2323
petgraph = "0.6.2"
2424
serde = { workspace = true }
25+
log = "0.4.21"
2526

2627
[dev-dependencies]
2728
serde = { workspace = true }

src/query/pipeline/core/src/pipeline.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ use std::fmt::Formatter;
1717
use std::sync::atomic::AtomicUsize;
1818
use std::sync::atomic::Ordering;
1919
use std::sync::Arc;
20+
use std::time::Instant;
2021

22+
use databend_common_base::runtime::defer;
2123
use databend_common_base::runtime::drop_guard;
2224
use databend_common_exception::ErrorCode;
2325
use databend_common_exception::Result;
26+
use log::info;
2427
use petgraph::matrix_graph::Zero;
2528

2629
use crate::pipe::Pipe;
@@ -440,10 +443,24 @@ impl Pipeline {
440443
}
441444
}
442445

446+
#[track_caller]
443447
pub fn set_on_init<F: FnOnce() -> Result<()> + Send + Sync + 'static>(&mut self, f: F) {
448+
let location = std::panic::Location::caller();
444449
if let Some(old_on_init) = self.on_init.take() {
445450
self.on_init = Some(Box::new(move || {
446451
old_on_init()?;
452+
let instants = Instant::now();
453+
454+
let _guard = defer(move || {
455+
info!(
456+
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
457+
instants.elapsed(),
458+
location.file(),
459+
location.line(),
460+
location.column()
461+
);
462+
});
463+
447464
f()
448465
}));
449466

@@ -453,33 +470,59 @@ impl Pipeline {
453470
self.on_init = Some(Box::new(f));
454471
}
455472

473+
#[track_caller]
456474
pub fn set_on_finished<
457475
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
458476
>(
459477
&mut self,
460478
f: F,
461479
) {
480+
let location = std::panic::Location::caller();
462481
if let Some(on_finished) = self.on_finished.take() {
463482
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
464483
on_finished((profiles, may_error))?;
484+
let instants = Instant::now();
485+
let _guard = defer(move || {
486+
info!(
487+
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
488+
instants.elapsed(),
489+
location.file(),
490+
location.line(),
491+
location.column()
492+
);
493+
});
494+
465495
f((profiles, may_error))
466496
}));
467-
468497
return;
469498
}
470499

471500
self.on_finished = Some(Box::new(f));
472501
}
473502

503+
#[track_caller]
474504
pub fn push_front_on_finished_callback<
475505
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
476506
>(
477507
&mut self,
478508
f: F,
479509
) {
510+
let location = std::panic::Location::caller();
480511
if let Some(on_finished) = self.on_finished.take() {
481512
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
513+
let instants = Instant::now();
514+
let guard = defer(move || {
515+
info!(
516+
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
517+
instants.elapsed(),
518+
location.file(),
519+
location.line(),
520+
location.column()
521+
);
522+
});
523+
482524
f((profiles, may_error))?;
525+
drop(guard);
483526
on_finished((profiles, may_error))
484527
}));
485528

src/query/service/src/pipelines/executor/pipeline_executor.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020

2121
use databend_common_base::base::WatchNotify;
2222
use databend_common_base::runtime::catch_unwind;
23+
use databend_common_base::runtime::defer;
2324
use databend_common_base::runtime::profile::Profile;
2425
use databend_common_base::runtime::GlobalIORuntime;
2526
use databend_common_base::runtime::TrySpawn;
@@ -179,6 +180,13 @@ impl PipelineExecutor {
179180
}
180181

181182
pub fn execute(&self) -> Result<()> {
183+
let instants = Instant::now();
184+
let _guard = defer(move || {
185+
info!(
186+
"Pipeline executor finished, elapsed: {:?}",
187+
instants.elapsed()
188+
);
189+
});
182190
match self {
183191
PipelineExecutor::QueryPipelineExecutor(executor) => executor.execute(),
184192
PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => {
@@ -233,16 +241,16 @@ impl PipelineExecutor {
233241
let this_graph = Arc::downgrade(&query_wrapper.graph);
234242
let finished_notify = query_wrapper.finished_notify.clone();
235243
GlobalIORuntime::instance().spawn(async move {
236-
let finished_future = Box::pin(finished_notify.notified());
237-
let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds));
238-
if let Either::Left(_) = select(max_execute_future, finished_future).await {
239-
if let Some(graph) = this_graph.upgrade() {
240-
graph.should_finish(Err(ErrorCode::AbortedQuery(
241-
"Aborted query, because the execution time exceeds the maximum execution time limit",
242-
))).expect("exceed max execute time, but cannot send error message");
243-
}
244-
}
245-
});
244+
let finished_future = Box::pin(finished_notify.notified());
245+
let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds));
246+
if let Either::Left(_) = select(max_execute_future, finished_future).await {
247+
if let Some(graph) = this_graph.upgrade() {
248+
graph.should_finish(Err(ErrorCode::AbortedQuery(
249+
"Aborted query, because the execution time exceeds the maximum execution time limit",
250+
))).expect("exceed max execute time, but cannot send error message");
251+
}
252+
}
253+
});
246254
}
247255

248256
Ok(())

0 commit comments

Comments
 (0)