Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/interpreter/background_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{atomic, Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

Expand All @@ -17,6 +17,7 @@ use std::time::Duration;
pub struct BackgroundRunner {
interval: Duration,
thread: Option<thread::JoinHandle<()>>,
force: Arc<atomic::AtomicBool>,
exit: Arc<Mutex<bool>>,
cv: Arc<(Mutex<()>, Condvar)>,
}
Expand All @@ -32,30 +33,40 @@ impl Drop for BackgroundRunner {
}

impl BackgroundRunner {
pub fn new(interval: Duration, cv: Arc<(Mutex<()>, Condvar)>) -> Self {
pub fn new(
interval: Duration,
cv: Arc<(Mutex<()>, Condvar)>,
force: Arc<atomic::AtomicBool>,
) -> Self {
return Self {
interval,
thread: None,
force,
exit: Arc::new(Mutex::new(false)),
cv,
};
}

pub fn start<C: Fn() + std::marker::Send + 'static>(&mut self, callback: C) {
pub fn start<C: Fn(bool) + std::marker::Send + 'static>(&mut self, callback: C) {
let interval = self.interval;
let cv = self.cv.clone();
let exit = self.exit.clone();
let force = self.force.clone();
self.thread = Some(std::thread::spawn(move || loop {
callback();
let was_force = force.swap(false, atomic::Ordering::SeqCst);
callback(was_force);

let _ = cv.1.wait_timeout(cv.0.lock().unwrap(), interval).unwrap();
if *exit.lock().unwrap() {
break;
}
}));
// Explicitly trigger at least one update with force
self.schedule();
}

pub fn schedule(&mut self) {
self.force.store(true, atomic::Ordering::SeqCst);
self.cv.1.notify_all();
}
}
12 changes: 11 additions & 1 deletion src/interpreter/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use chdig::ActionDescription;
use chrono::Duration;
use cursive::{event::Event, event::EventResult, views::Dialog, views::OnEventView, Cursive, View};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{atomic, Arc, Condvar, Mutex};

pub type ContextArc = Arc<Mutex<Context>>;

Expand All @@ -27,6 +27,8 @@ pub struct Context {
pub server_version: String,
pub worker: Worker,
pub background_runner_cv: Arc<(Mutex<()>, Condvar)>,
pub background_runner_force: Arc<atomic::AtomicBool>,
pub background_runner_summary_force: Arc<atomic::AtomicBool>,

pub cb_sink: cursive::CbSink,

Expand All @@ -46,13 +48,17 @@ impl Context {
let server_version = clickhouse.version();
let worker = Worker::new();
let background_runner_cv = Arc::new((Mutex::new(()), Condvar::new()));
let background_runner_force = Arc::new(atomic::AtomicBool::new(false));
let background_runner_summary_force = Arc::new(atomic::AtomicBool::new(false));

let context = Arc::new(Mutex::new(Context {
options,
clickhouse,
server_version,
worker,
background_runner_cv,
background_runner_force,
background_runner_summary_force,
cb_sink,
global_actions: Vec::new(),
views_menu_actions: Vec::new(),
Expand Down Expand Up @@ -153,6 +159,10 @@ impl Context {
}

pub fn trigger_view_refresh(&self) {
self.background_runner_force
.store(true, atomic::Ordering::SeqCst);
self.background_runner_summary_force
.store(true, atomic::Ordering::SeqCst);
self.background_runner_cv.1.notify_all();
}

Expand Down
5 changes: 3 additions & 2 deletions src/interpreter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ impl Worker {
return self.paused;
}

pub fn send(&mut self, event: Event) {
if self.paused {
// @force - ignore pause
pub fn send(&mut self, force: bool, event: Event) {
if !force && self.paused {
return;
}

Expand Down
9 changes: 5 additions & 4 deletions src/view/navigation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,13 +689,14 @@ impl Navigation for Cursive {
let start = context.options.view.start;
let end = context.options.view.end;
if let Some(trace_type) = trace_type {
context.worker.send(WorkerEvent::ShowServerFlameGraph(
tui, trace_type, start, end,
));
context.worker.send(
true,
WorkerEvent::ShowServerFlameGraph(tui, trace_type, start, end),
);
} else {
context
.worker
.send(WorkerEvent::ShowLiveQueryFlameGraph(tui, None));
.send(true, WorkerEvent::ShowLiveQueryFlameGraph(tui, None));
}
}

Expand Down
66 changes: 36 additions & 30 deletions src/view/processes_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,21 @@ impl ProcessesView {
self.get_query_ids()?;
let mut context_locked = self.context.lock().unwrap();
if let Some(trace_type) = trace_type {
context_locked.worker.send(WorkerEvent::ShowQueryFlameGraph(
trace_type,
tui,
min_query_start_microseconds,
max_query_end_microseconds,
query_ids,
));
context_locked.worker.send(
true,
WorkerEvent::ShowQueryFlameGraph(
trace_type,
tui,
min_query_start_microseconds,
max_query_end_microseconds,
query_ids,
),
);
} else {
context_locked
.worker
.send(WorkerEvent::ShowLiveQueryFlameGraph(tui, Some(query_ids)));
context_locked.worker.send(
true,
WorkerEvent::ShowLiveQueryFlameGraph(tui, Some(query_ids)),
);
}

return Ok(());
Expand Down Expand Up @@ -453,7 +457,7 @@ impl ProcessesView {
let update_callback_context = context.clone();
let update_callback_filter = filter.clone();
let update_callback_limit = limit.clone();
let update_callback = move || {
let update_callback = move |force: bool| {
let mut context = update_callback_context.lock().unwrap();
let filter = update_callback_filter.lock().unwrap().clone();
let limit = *update_callback_limit.lock().unwrap();
Expand All @@ -464,13 +468,15 @@ impl ProcessesView {
match processes_type {
Type::ProcessList => context
.worker
.send(WorkerEvent::UpdateProcessList(filter, limit)),
Type::SlowQueryLog => context.worker.send(WorkerEvent::UpdateSlowQueryLog(
filter, start_time, end_time, limit,
)),
Type::LastQueryLog => context.worker.send(WorkerEvent::UpdateLastQueryLog(
filter, start_time, end_time, limit,
)),
.send(force, WorkerEvent::UpdateProcessList(filter, limit)),
Type::SlowQueryLog => context.worker.send(
force,
WorkerEvent::UpdateSlowQueryLog(filter, start_time, end_time, limit),
),
Type::LastQueryLog => context.worker.send(
force,
WorkerEvent::UpdateLastQueryLog(filter, start_time, end_time, limit),
),
}
};

Expand Down Expand Up @@ -504,7 +510,8 @@ impl ProcessesView {
}

let bg_runner_cv = context.lock().unwrap().background_runner_cv.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv);
let bg_runner_force = context.lock().unwrap().background_runner_force.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv, bg_runner_force);
bg_runner.start(update_callback);

let processes_view = ProcessesView {
Expand Down Expand Up @@ -826,7 +833,7 @@ impl ProcessesView {
// TODO: add support for Log packets into clickhouse-rs and execute query with logging in place
context_locked
.worker
.send(WorkerEvent::ExecuteQuery(database, query));
.send(true, WorkerEvent::ExecuteQuery(database, query));

return Ok(Some(EventResult::Consumed(Some(Callback::from_fn_once(
|siv| siv.clear(),
Expand Down Expand Up @@ -868,7 +875,7 @@ impl ProcessesView {
let mut context_locked = v.context.lock().unwrap();
context_locked
.worker
.send(WorkerEvent::ExplainSyntax(database, query, settings));
.send(true, WorkerEvent::ExplainSyntax(database, query, settings));

return Ok(Some(EventResult::consumed()));
});
Expand All @@ -880,7 +887,7 @@ impl ProcessesView {
let mut context_locked = v.context.lock().unwrap();
context_locked
.worker
.send(WorkerEvent::ExplainPlan(database, query));
.send(true, WorkerEvent::ExplainPlan(database, query));

return Ok(Some(EventResult::consumed()));
});
Expand All @@ -892,7 +899,7 @@ impl ProcessesView {
let mut context_locked = v.context.lock().unwrap();
context_locked
.worker
.send(WorkerEvent::ExplainPipeline(database, query));
.send(true, WorkerEvent::ExplainPipeline(database, query));

return Ok(Some(EventResult::consumed()));
});
Expand All @@ -906,11 +913,10 @@ impl ProcessesView {
let query = selected_query.original_query.clone();
let database = selected_query.current_database.clone();
let mut context_locked = v.context.lock().unwrap();
context_locked
.worker
.send(WorkerEvent::ExplainPipelineOpenGraphInBrowser(
database, query,
));
context_locked.worker.send(
true,
WorkerEvent::ExplainPipelineOpenGraphInBrowser(database, query),
);

return Ok(Some(EventResult::consumed()));
},
Expand All @@ -923,7 +929,7 @@ impl ProcessesView {
let mut context_locked = v.context.lock().unwrap();
context_locked
.worker
.send(WorkerEvent::ExplainPlanIndexes(database, query));
.send(true, WorkerEvent::ExplainPlanIndexes(database, query));

return Ok(Some(EventResult::consumed()));
});
Expand All @@ -948,7 +954,7 @@ impl ProcessesView {
.lock()
.unwrap()
.worker
.send(WorkerEvent::KillQuery(query_id.clone()));
.send(true, WorkerEvent::KillQuery(query_id.clone()));
// TODO: wait for the KILL
s.pop_layer();
})
Expand Down
7 changes: 4 additions & 3 deletions src/view/query_result_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ impl QueryResultView {
let delay = context.lock().unwrap().options.view.delay_interval;

let update_callback_context = context.clone();
let update_callback = move || {
let update_callback = move |force: bool| {
update_callback_context
.lock()
.unwrap()
.worker
.send(WorkerEvent::ViewQuery(view_name, query.clone()));
.send(force, WorkerEvent::ViewQuery(view_name, query.clone()));
};

let columns = parse_columns(&columns);
Expand Down Expand Up @@ -212,7 +212,8 @@ impl QueryResultView {
});

let bg_runner_cv = context.lock().unwrap().background_runner_cv.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv);
let bg_runner_force = context.lock().unwrap().background_runner_force.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv, bg_runner_force);
bg_runner.start(update_callback);

let view = QueryResultView {
Expand Down
11 changes: 8 additions & 3 deletions src/view/summary_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ impl SummaryView {
let delay = context.lock().unwrap().options.view.delay_interval;

let update_callback_context = context.clone();
let update_callback = move || {
let update_callback = move |force: bool| {
update_callback_context
.lock()
.unwrap()
.worker
.send(WorkerEvent::UpdateSummary);
.send(force, WorkerEvent::UpdateSummary);
};

let layout = views::LinearLayout::vertical()
Expand Down Expand Up @@ -196,7 +196,12 @@ impl SummaryView {
);

let bg_runner_cv = context.lock().unwrap().background_runner_cv.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv);
let bg_runner_force = context
.lock()
.unwrap()
.background_runner_summary_force
.clone();
let mut bg_runner = BackgroundRunner::new(delay, bg_runner_cv, bg_runner_force);
bg_runner.start(update_callback);

return Self {
Expand Down
36 changes: 18 additions & 18 deletions src/view/text_log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,34 @@ impl TextLogView {
if query_ids.is_some() {
max_query_end_microseconds += Duration::try_seconds(3).unwrap();
}
context
.lock()
.unwrap()
.worker
.send(WorkerEvent::GetQueryTextLog(
context.lock().unwrap().worker.send(
true,
WorkerEvent::GetQueryTextLog(
view_name,
query_ids.clone(),
query_start_microseconds,
Some(max_query_end_microseconds),
));
),
);
} else {
let update_query_ids = query_ids.clone();
let update_last_event_time_microseconds = last_event_time_microseconds.clone();
let update_callback_context = context.clone();
let update_callback =
move || {
update_callback_context.lock().unwrap().worker.send(
WorkerEvent::GetQueryTextLog(
view_name,
update_query_ids.clone(),
*update_last_event_time_microseconds.lock().unwrap(),
max_query_end_microseconds,
),
);
};
let update_callback = move |force: bool| {
update_callback_context.lock().unwrap().worker.send(
force,
WorkerEvent::GetQueryTextLog(
view_name,
update_query_ids.clone(),
*update_last_event_time_microseconds.lock().unwrap(),
max_query_end_microseconds,
),
);
};

let bg_runner_cv = context.lock().unwrap().background_runner_cv.clone();
let mut created_bg_runner = BackgroundRunner::new(delay, bg_runner_cv);
let bg_runner_force = context.lock().unwrap().background_runner_force.clone();
let mut created_bg_runner = BackgroundRunner::new(delay, bg_runner_cv, bg_runner_force);
created_bg_runner.start(update_callback);
bg_runner = Some(created_bg_runner);
}
Expand Down
Loading