Skip to content

refactor: refine system history tables and add login table #17937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/reuse.linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ jobs:
needs: [build, check]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
type: ${{ inputs.license_type }}
- uses: ./.github/actions/test_logs
timeout-minutes: 20

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion scripts/ci/deploy/config/databend-query-node-otlp-logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,19 @@ otlp_protocol = "http"
pkey1 = "pvalue1"
pkey2 = "pvalue2"

[log.persistentlog]
[log.history]
on = true
level = "INFO"

[[log.history.tables]]
table_name = "query_history"

[[log.history.tables]]
table_name = "profile_history"

[[log.history.tables]]
table_name = "login_history"

[meta]
endpoints = ["0.0.0.0:9191"]
username = "root"
Expand Down
8 changes: 4 additions & 4 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use databend_common_version::DATABEND_GIT_SEMVER;
use databend_common_version::DATABEND_GIT_SHA;
use databend_common_version::DATABEND_SEMVER;
use databend_query::clusters::ClusterDiscovery;
use databend_query::history_tables::GlobalHistoryLog;
use databend_query::local;
use databend_query::persistent_log::GlobalPersistentLog;
use databend_query::servers::admin::AdminService;
use databend_query::servers::flight::FlightService;
use databend_query::servers::metrics::MetricService;
Expand Down Expand Up @@ -291,9 +291,9 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
if conf.log.structlog.on {
println!(" structlog: {}", conf.log.structlog);
}
if conf.log.persistentlog.on {
GlobalPersistentLog::instance().initialized();
println!(" persistentlog: {}", conf.log.persistentlog);
if conf.log.history.on {
GlobalHistoryLog::instance().initialized();
println!(" system history tables: {}", conf.log.history);
}

println!();
Expand Down
1 change: 1 addition & 0 deletions src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ opentelemetry_sdk = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
toml = { workspace = true }
tonic = { workspace = true }

[dev-dependencies]
Expand Down
39 changes: 29 additions & 10 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use itertools::Itertools;

// use the uncommon usage of Pascal Case level name
// to partially avoid the feature of serfig that can not override with the default value.
// see https://github.com/Xuanwo/serfig/issues/23 for detail.
Expand All @@ -31,7 +33,7 @@ pub struct Config {
pub profile: ProfileLogConfig,
pub structlog: StructLogConfig,
pub tracing: TracingConfig,
pub persistentlog: PersistentLogConfig,
pub history: HistoryConfig,
}

impl Config {
Expand Down Expand Up @@ -336,42 +338,59 @@ impl Default for OTLPEndpointConfig {
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct PersistentLogConfig {
pub struct HistoryConfig {
pub on: bool,
pub interval: usize,
pub stage_name: String,
pub level: String,
pub retention: usize,
pub retention_interval: usize,
pub tables: Vec<HistoryTableConfig>,
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct HistoryTableConfig {
pub table_name: String,
pub retention: usize,
}

impl Display for PersistentLogConfig {
impl Display for HistoryConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"enabled={}, interval={}, stage_name={}, level={}, retention={}, retention_interval={}",
"enabled={}, interval={}, stage_name={}, level={}, retention_interval={}, tables=[{}]",
self.on,
self.interval,
self.stage_name,
self.level,
self.retention,
self.retention_interval
self.retention_interval,
self.tables
.iter()
.map(|f| format!("{}({} hours)", f.table_name.clone(), f.retention))
.join(", ")
)
}
}

impl Default for PersistentLogConfig {
impl Default for HistoryConfig {
fn default() -> Self {
Self {
on: false,
interval: 2,
// The default value of stage name uses an uuid to avoid conflicts with existing stages
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
level: "WARN".to_string(),
// Data older than 72 hours will be deleted during retention tasks
retention: 72,
// Trigger the retention task every 24 hours
retention_interval: 24,
tables: vec![],
}
}
}

impl Default for HistoryTableConfig {
fn default() -> Self {
Self {
table_name: "".to_string(),
retention: 168,
}
}
}
28 changes: 12 additions & 16 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::config::OTLPProtocol;
use crate::filter::filter_by_thread_tracker;
use crate::loggers::get_layout;
use crate::loggers::new_rolling_file_appender;
use crate::predefined_tables::table_to_target;
use crate::remote_log::RemoteLog;
use crate::structlog::StructLogReporter;
use crate::Config;
Expand Down Expand Up @@ -386,31 +387,26 @@ pub fn init_logging(
.append(structlog_log_file);
logger = logger.dispatch(dispatch);
}

if cfg.persistentlog.on {
if cfg.history.on {
let (remote_log, flush_guard) =
RemoteLog::new(&labels, cfg).expect("initialize remote logger");

let mut filter_builder =
EnvFilterBuilder::new().filter(Some("databend::log::structlog"), LevelFilter::Off);

if cfg.profile.on {
filter_builder =
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Trace);
} else {
filter_builder =
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Off);
let mut table_to_target = table_to_target();

for table_cfg in cfg.history.tables.iter() {
if let Some(target) = table_to_target.remove(&table_cfg.table_name) {
filter_builder = filter_builder.filter(Some(&target), LevelFilter::Trace);
}
}
if cfg.query.on {
filter_builder =
filter_builder.filter(Some("databend::log::query"), LevelFilter::Trace);
} else {
filter_builder = filter_builder.filter(Some("databend::log::query"), LevelFilter::Off);
for (_, target) in table_to_target {
filter_builder = filter_builder.filter(Some(&target), LevelFilter::Off);
}

let dispatch = Dispatch::new()
.filter(EnvFilter::new(
filter_builder.parse(&cfg.persistentlog.level),
))
.filter(EnvFilter::new(filter_builder.parse(&cfg.history.level)))
.filter(filter_by_thread_tracker())
.append(remote_log);

Expand Down
9 changes: 6 additions & 3 deletions src/common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ mod panic_hook;
mod remote_log;
mod structlog;

mod predefined_tables;

pub use crash_hook::pipe_file;
pub use crash_hook::SignalListener;

pub use crate::config::Config;
pub use crate::config::FileConfig;
pub use crate::config::HistoryConfig;
pub use crate::config::HistoryTableConfig;
pub use crate::config::OTLPConfig;
pub use crate::config::OTLPEndpointConfig;
pub use crate::config::OTLPProtocol;
pub use crate::config::PersistentLogConfig;
pub use crate::config::ProfileLogConfig;
pub use crate::config::QueryLogConfig;
pub use crate::config::StderrConfig;
Expand All @@ -48,16 +51,16 @@ pub use crate::init::start_trace_for_remote_request;
pub use crate::init::GlobalLogger;
pub use crate::panic_hook::log_panic;
pub use crate::panic_hook::set_panic_hook;
pub use crate::predefined_tables::init_history_tables;
pub use crate::predefined_tables::HistoryTable;
pub use crate::remote_log::convert_to_batch;
pub use crate::remote_log::LogBuffer as RemoteLogBuffer;
pub use crate::remote_log::LogMessage;
pub use crate::remote_log::RemoteLog;
pub use crate::remote_log::RemoteLogElement;
pub use crate::remote_log::RemoteLogGuard;
pub use crate::remote_log::PERSISTENT_LOG_SCHEMA_VERSION;
pub use crate::structlog::DummyReporter;
pub use crate::structlog::StructLogReporter;

pub fn closure_name<F: std::any::Any>() -> &'static str {
let func_path = std::any::type_name::<F>();
func_path
Expand Down
118 changes: 118 additions & 0 deletions src/common/tracing/src/predefined_tables/history_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::HistoryConfig;

const TABLES_TOML: &str = include_str!("./history_tables.toml");

#[derive(Debug)]
pub struct HistoryTable {
pub name: String,
pub create: String,
pub transform: String,
pub delete: String,
}

impl HistoryTable {
pub fn create(predefined: PredefinedTable, retention: u64) -> Self {
HistoryTable {
name: predefined.name,
create: predefined.create,
transform: predefined.transform,
delete: predefined
.delete
.replace("{retention_hours}", &retention.to_string()),
}
}

pub fn assemble_log_history_transform(&self, stage_name: &str, batch_number: u64) -> String {
let mut transform = self.transform.clone();
transform = transform.replace("{stage_name}", stage_name);
transform = transform.replace("{batch_number}", &batch_number.to_string());
transform
}

pub fn assemble_normal_transform(&self, begin: u64, end: u64) -> String {
let mut transform = self.transform.clone();
transform = transform.replace("{batch_begin}", &begin.to_string());
transform = transform.replace("{batch_end}", &end.to_string());
transform
}
}

#[derive(serde::Deserialize)]
pub struct PredefinedTables {
pub tables: Vec<PredefinedTable>,
}

#[derive(serde::Deserialize, Ord, PartialOrd, Eq, PartialEq)]
pub struct PredefinedTable {
pub name: String,
pub target: String,
pub create: String,
pub transform: String,
pub delete: String,
}

pub fn init_history_tables(cfg: &HistoryConfig) -> Result<Vec<Arc<HistoryTable>>> {
let predefined_tables: PredefinedTables =
toml::from_str(TABLES_TOML).expect("Failed to parse toml");

let mut predefined_map: BTreeMap<String, PredefinedTable> = BTreeMap::from_iter(
predefined_tables
.tables
.into_iter()
.map(|table| (table.name.clone(), table)),
);

let mut history_tables = Vec::with_capacity(cfg.tables.len());
history_tables.push(Arc::new(HistoryTable::create(
predefined_map.remove("log_history").unwrap(),
24 * 7,
)));
for enable_table in cfg.tables.iter() {
if let Some(predefined_table) = predefined_map.remove(&enable_table.table_name) {
let retention = enable_table.retention;
history_tables.push(Arc::new(HistoryTable::create(
predefined_table,
retention as u64,
)));
} else {
return Err(ErrorCode::InvalidConfig(format!(
"Invalid history table name {}",
enable_table.table_name
)));
}
}
Ok(history_tables)
}

pub fn table_to_target() -> HashMap<String, String> {
let predefined_tables: PredefinedTables =
toml::from_str(TABLES_TOML).expect("Failed to parse toml");
let mut table_to_target = HashMap::new();
for table in predefined_tables.tables {
if table.name != "log_history" {
table_to_target.insert(table.name, table.target);
}
}
table_to_target
}
27 changes: 27 additions & 0 deletions src/common/tracing/src/predefined_tables/history_tables.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[[tables]]
name = "log_history"
target = ""
create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)"
transform = "COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"

[[tables]]
name = "query_history"
target = "databend::log::query"
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL) CLUSTER BY LINEAR(event_time, query_id)"
transform = "INSERT INTO system_history.query_history FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenant_id'], m['cluster_id'], m['node_id'], m['sql_user'], m['sql_user_quota'], m['sql_user_privileges'], m['query_id'], m['query_kind'], m['query_text'], m['query_hash'], m['query_parameterized_hash'], m['event_date'], m['event_time'], m['query_start_time'], m['query_duration_ms'], m['query_queued_duration_ms'], m['current_database'], m['written_rows'], m['written_bytes'], m['join_spilled_rows'], m['join_spilled_bytes'], m['agg_spilled_rows'], m['agg_spilled_bytes'], m['group_by_spilled_rows'], m['group_by_spilled_bytes'], m['written_io_bytes'], m['written_io_bytes_cost_ms'], m['scan_rows'], m['scan_bytes'], m['scan_io_bytes'], m['scan_io_bytes_cost_ms'], m['scan_partitions'], m['total_partitions'], m['result_rows'], m['result_bytes'], m['bytes_from_remote_disk'], m['bytes_from_local_disk'], m['bytes_from_memory'], m['client_address'], m['user_agent'], m['exception_code'], m['exception_text'], m['server_version'], m['query_tag'], m['has_profile'], m['peek_memory_usage'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"

[[tables]]
name = "profile_history"
target = "databend::log::profile"
create = "CREATE TABLE IF NOT EXISTS system_history.profile_history (timestamp TIMESTAMP NULL, query_id VARCHAR NULL, profiles VARIANT NULL, statistics_desc VARIANT NULL) CLUSTER BY (timestamp, query_id)"
transform = "INSERT INTO system_history.profile_history FROM (SELECT timestamp, m['query_id'], m['profiles'], m['statistics_desc'] FROM (SELECT timestamp, parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::profile' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
delete = "DELETE FROM system_history.profile_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"

[[tables]]
name = "login_history"
target = "databend::log::login"
create = "CREATE TABLE IF NOT EXISTS system_history.login_history (event_time TIMESTAMP NULL, handler STRING NULL, event_type STRING NULL, connection_uri STRING NULL, auth_type STRING NULL, user_name STRING NULL, client_ip STRING NULL, user_agent STRING NULL, session_id STRING NULL, node_id STRING NULL, error_message STRING NULL)"
transform = "INSERT INTO system_history.login_history FROM (SELECT m['event_time'], m['handler'], m['event_type'], m['connection_uri'], m['auth_type'], m['user_name'], m['client_ip'], m['user_agent'], m['session_id'], m['node_id'], m['error_message'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::login' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
delete = "DELETE FROM system_history.login_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"
Loading
Loading