Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/interpreter/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum ChDigViews {
ServerLogs,
/// Show loggers (system.text_log)
Loggers,
/// Show background schedule pool tasks (system.background_schedule_pool)
BackgroundSchedulePool,
/// Spawn client inside chdig
Client,
}
Expand Down
63 changes: 63 additions & 0 deletions src/interpreter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub enum Event {
ExplainPlanIndexes(String, String),
// (view_name, query)
SQLQuery(&'static str, String),
// (log_name, database, table, start, end)
BackgroundSchedulePoolLogs(String, String, String, RelativeDateTime, RelativeDateTime),
}

impl Event {
Expand All @@ -81,6 +83,7 @@ impl Event {
Event::ExplainPipelineOpenGraphInBrowser(..) => "ExplainPipelineOpenGraphInBrowser",
Event::ExplainPlanIndexes(..) => "ExplainPlanIndexes",
Event::SQLQuery(..) => "SQLQuery",
Event::BackgroundSchedulePoolLogs(..) => "BackgroundSchedulePoolLogs",
}
}
}
Expand Down Expand Up @@ -547,6 +550,66 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool)
}))
.map_err(|_| anyhow!("Cannot send message to UI"))?;
}
Event::BackgroundSchedulePoolLogs(log_name, database, table, start, end) => {
let dbtable = clickhouse.get_table_name("system", "background_schedule_pool_log");
let query = format!(
"SELECT DISTINCT query_id FROM {} WHERE log_name = '{}' AND database = '{}' AND table = '{}'",
dbtable,
log_name.replace('\'', "''"),
database.replace('\'', "''"),
table.replace('\'', "''"),
);

let columns = clickhouse.execute(&query).await?;
let mut query_ids = Vec::new();
for i in 0..columns.row_count() {
if let Ok(query_id) = columns.get::<String, _>(i, "query_id") {
query_ids.push(query_id);
}
}

if query_ids.is_empty() {
return Err(anyhow!(
"No entries for {} jobs (database: {}, table: {}, start: {}, end: {})",
log_name,
database,
table,
start,
end
));
}

cb_sink
.send(Box::new(move |siv: &mut cursive::Cursive| {
use cursive::view::Resizable;
let context = siv.user_data::<ContextArc>().unwrap().clone();
siv.add_layer(views::Dialog::around(
views::LinearLayout::vertical()
.child(
views::TextView::new(format!("Logs for task: {}", log_name))
.center(),
)
.child(views::DummyView.fixed_height(1))
.child(views::NamedView::new(
"background_schedule_pool_logs",
view::TextLogView::new(
"background_schedule_pool_logs",
context,
TextLogArguments {
query_ids: Some(query_ids),
logger_names: None,
message_filter: None,
max_level: None,
start: start.into(),
end,
},
),
)),
));
siv.focus_name("background_schedule_pool_logs").unwrap();
}))
.map_err(|_| anyhow!("Cannot send message to UI"))?;
}
}

return Ok(());
Expand Down
1 change: 1 addition & 0 deletions src/view/navigation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl Navigation for Cursive {
c.register_provider(Arc::new(ReplicationQueueViewProvider));
c.register_provider(Arc::new(ReplicasViewProvider));
c.register_provider(Arc::new(TablesViewProvider));
c.register_provider(Arc::new(BackgroundSchedulePoolViewProvider));
c.register_provider(Arc::new(BackupsViewProvider));
c.register_provider(Arc::new(DictionariesViewProvider));
c.register_provider(Arc::new(ServerLogsViewProvider));
Expand Down
120 changes: 120 additions & 0 deletions src/view/providers/background_schedule_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use crate::{
interpreter::{ContextArc, WorkerEvent, options::ChDigViews},
view::{self, navigation::Navigation, provider::ViewProvider},
};
use cursive::{
Cursive,
view::{Nameable, Resizable},
views::Dialog,
};
use std::collections::HashMap;

pub struct BackgroundSchedulePoolViewProvider;

impl ViewProvider for BackgroundSchedulePoolViewProvider {
fn name(&self) -> &'static str {
"Background jobs"
}

fn view_type(&self) -> ChDigViews {
ChDigViews::BackgroundSchedulePool
}

fn show(&self, siv: &mut Cursive, context: ContextArc) {
if siv.has_view("background_schedule_pool") {
return;
}

let mut columns = vec![
"pool",
"database",
"table",
"log_name",
"query_id",
"elapsed_ms",
"executing",
"scheduled",
"delayed",
];

let cluster = context.lock().unwrap().options.clickhouse.cluster.is_some();
let columns_to_compare = if cluster {
columns.insert(0, "hostName() host");
vec!["host", "pool", "database", "table", "log_name"]
} else {
vec!["pool", "database", "table", "log_name"]
};

let dbtable = context
.lock()
.unwrap()
.clickhouse
.get_table_name("system", "background_schedule_pool");

let query = format!(
"SELECT {} FROM {} ORDER BY pool, database, table, log_name",
columns.join(", "),
dbtable,
);

siv.drop_main_view();

let mut view = view::SQLQueryView::new(
context.clone(),
"background_schedule_pool",
"elapsed_ms",
columns.clone(),
columns_to_compare,
query,
)
.unwrap_or_else(|_| panic!("Cannot get background_schedule_pool"));

let background_schedule_pool_logs_callback =
move |siv: &mut Cursive, columns: Vec<&'static str>, row: view::QueryResultRow| {
show_logs_for_background_schedule_pool_task(siv, columns, row);
};
view.get_inner_mut()
.set_on_submit(background_schedule_pool_logs_callback);

let view = view.with_name("background_schedule_pool").full_screen();
siv.set_main_view(Dialog::around(view).title("Background Schedule Pool"));
}
}

fn show_logs_for_background_schedule_pool_task(
siv: &mut Cursive,
columns: Vec<&'static str>,
row: view::QueryResultRow,
) {
let row = row.0;

let mut map = HashMap::<String, String>::new();
columns.iter().zip(row.iter()).for_each(|(c, r)| {
let value = r.to_string();
map.insert(c.to_string(), value);
});

let log_name = map
.get("log_name")
.map(|s| s.to_owned())
.unwrap_or_default();
let database = map
.get("database")
.map(|s| s.to_owned())
.unwrap_or_default();
let table = map.get("table").map(|s| s.to_owned()).unwrap_or_default();

let context = siv.user_data::<ContextArc>().unwrap().clone();
let view_options = context.clone().lock().unwrap().options.view.clone();

context.lock().unwrap().worker.send(
true,
WorkerEvent::BackgroundSchedulePoolLogs(
log_name,
database,
table,
view_options.start,
view_options.end,
),
);
}
2 changes: 2 additions & 0 deletions src/view/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod background_schedule_pool;
mod backups;
mod client;
mod dictionaries;
Expand All @@ -13,6 +14,7 @@ mod s3queue;
mod server_logs;
mod tables;

pub use background_schedule_pool::BackgroundSchedulePoolViewProvider;
pub use backups::BackupsViewProvider;
pub use client::ClientViewProvider;
pub use dictionaries::DictionariesViewProvider;
Expand Down
Loading