Skip to content

Commit 04f5733

Browse files
committed
refactor: refine system history tables and add login table
1 parent 0a6673b commit 04f5733

File tree

31 files changed

+881
-1410
lines changed

31 files changed

+881
-1410
lines changed

.github/workflows/reuse.linux.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ jobs:
141141
needs: [build, check]
142142
steps:
143143
- uses: actions/checkout@v4
144+
- uses: ./.github/actions/setup_license
145+
with:
146+
runner_provider: ${{ inputs.runner_provider }}
147+
type: ${{ inputs.license_type }}
144148
- uses: ./.github/actions/test_logs
145149
timeout-minutes: 20
146150

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scripts/ci/deploy/config/databend-query-node-otlp-logs.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,19 @@ otlp_protocol = "http"
6565
pkey1 = "pvalue1"
6666
pkey2 = "pvalue2"
6767

68-
[log.persistentlog]
68+
[log.history]
6969
on = true
7070
level = "INFO"
7171

72+
[[log.history.tables]]
73+
table_name = "query_history"
74+
75+
[[log.history.tables]]
76+
table_name = "profile_history"
77+
78+
[[log.history.tables]]
79+
table_name = "login_history"
80+
7281
[meta]
7382
endpoints = ["0.0.0.0:9191"]
7483
username = "root"

src/binaries/query/entry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use databend_common_version::DATABEND_GIT_SEMVER;
3232
use databend_common_version::DATABEND_GIT_SHA;
3333
use databend_common_version::DATABEND_SEMVER;
3434
use databend_query::clusters::ClusterDiscovery;
35+
use databend_query::history_tables::GlobalHistoryLog;
3536
use databend_query::local;
36-
use databend_query::persistent_log::GlobalPersistentLog;
3737
use databend_query::servers::admin::AdminService;
3838
use databend_query::servers::flight::FlightService;
3939
use databend_query::servers::metrics::MetricService;
@@ -291,9 +291,9 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
291291
if conf.log.structlog.on {
292292
println!(" structlog: {}", conf.log.structlog);
293293
}
294-
if conf.log.persistentlog.on {
295-
GlobalPersistentLog::instance().initialized();
296-
println!(" persistentlog: {}", conf.log.persistentlog);
294+
if conf.log.history.on {
295+
GlobalHistoryLog::instance().initialized();
296+
println!(" system history tables: {}", conf.log.history);
297297
}
298298

299299
println!();

src/common/tracing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ opentelemetry_sdk = { workspace = true }
3030
parquet = { workspace = true }
3131
serde = { workspace = true }
3232
serde_json = { workspace = true }
33+
toml = { workspace = true }
3334
tonic = { workspace = true }
3435

3536
[dev-dependencies]

src/common/tracing/src/config.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use std::collections::BTreeMap;
1616
use std::fmt::Display;
1717
use std::fmt::Formatter;
1818

19+
use itertools::Itertools;
20+
1921
// use the uncommon usage of Pascal Case level name
2022
// to partially avoid the feature of serfig that can not override with the default value.
2123
// see https://github.com/Xuanwo/serfig/issues/23 for detail.
@@ -31,7 +33,7 @@ pub struct Config {
3133
pub profile: ProfileLogConfig,
3234
pub structlog: StructLogConfig,
3335
pub tracing: TracingConfig,
34-
pub persistentlog: PersistentLogConfig,
36+
pub history: HistoryConfig,
3537
}
3638

3739
impl Config {
@@ -336,42 +338,59 @@ impl Default for OTLPEndpointConfig {
336338
}
337339

338340
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
339-
pub struct PersistentLogConfig {
341+
pub struct HistoryConfig {
340342
pub on: bool,
341343
pub interval: usize,
342344
pub stage_name: String,
343345
pub level: String,
344-
pub retention: usize,
345346
pub retention_interval: usize,
347+
pub tables: Vec<HistoryTableConfig>,
348+
}
349+
350+
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
351+
pub struct HistoryTableConfig {
352+
pub table_name: String,
353+
pub retention: usize,
346354
}
347355

348-
impl Display for PersistentLogConfig {
356+
impl Display for HistoryConfig {
349357
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
350358
write!(
351359
f,
352-
"enabled={}, interval={}, stage_name={}, level={}, retention={}, retention_interval={}",
360+
"enabled={}, interval={}, stage_name={}, level={}, retention_interval={}, tables=[{}]",
353361
self.on,
354362
self.interval,
355363
self.stage_name,
356364
self.level,
357-
self.retention,
358-
self.retention_interval
365+
self.retention_interval,
366+
self.tables
367+
.iter()
368+
.map(|f| format!("{}({} hours)", f.table_name.clone(), f.retention))
369+
.join(", ")
359370
)
360371
}
361372
}
362373

363-
impl Default for PersistentLogConfig {
374+
impl Default for HistoryConfig {
364375
fn default() -> Self {
365376
Self {
366377
on: false,
367378
interval: 2,
368379
// The default value of stage name uses an uuid to avoid conflicts with existing stages
369380
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
370381
level: "WARN".to_string(),
371-
// Data older than 72 hours will be deleted during retention tasks
372-
retention: 72,
373382
// Trigger the retention task every 24 hours
374383
retention_interval: 24,
384+
tables: vec![],
385+
}
386+
}
387+
}
388+
389+
impl Default for HistoryTableConfig {
390+
fn default() -> Self {
391+
Self {
392+
table_name: "".to_string(),
393+
retention: 168,
375394
}
376395
}
377396
}

src/common/tracing/src/init.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::config::OTLPProtocol;
3434
use crate::filter::filter_by_thread_tracker;
3535
use crate::loggers::get_layout;
3636
use crate::loggers::new_rolling_file_appender;
37+
use crate::predefined_tables::table_to_target;
3738
use crate::remote_log::RemoteLog;
3839
use crate::structlog::StructLogReporter;
3940
use crate::Config;
@@ -386,31 +387,26 @@ pub fn init_logging(
386387
.append(structlog_log_file);
387388
logger = logger.dispatch(dispatch);
388389
}
389-
390-
if cfg.persistentlog.on {
390+
if cfg.history.on {
391391
let (remote_log, flush_guard) =
392392
RemoteLog::new(&labels, cfg).expect("initialize remote logger");
393393

394394
let mut filter_builder =
395395
EnvFilterBuilder::new().filter(Some("databend::log::structlog"), LevelFilter::Off);
396396

397-
if cfg.profile.on {
398-
filter_builder =
399-
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Trace);
400-
} else {
401-
filter_builder =
402-
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Off);
397+
let mut table_to_target = table_to_target();
398+
399+
for table_cfg in cfg.history.tables.iter() {
400+
if let Some(target) = table_to_target.remove(&table_cfg.table_name) {
401+
filter_builder = filter_builder.filter(Some(&target), LevelFilter::Trace);
402+
}
403403
}
404-
if cfg.query.on {
405-
filter_builder =
406-
filter_builder.filter(Some("databend::log::query"), LevelFilter::Trace);
407-
} else {
408-
filter_builder = filter_builder.filter(Some("databend::log::query"), LevelFilter::Off);
404+
for (_, target) in table_to_target {
405+
filter_builder = filter_builder.filter(Some(&target), LevelFilter::Off);
409406
}
407+
410408
let dispatch = Dispatch::new()
411-
.filter(EnvFilter::new(
412-
filter_builder.parse(&cfg.persistentlog.level),
413-
))
409+
.filter(EnvFilter::new(filter_builder.parse(&cfg.history.level)))
414410
.filter(filter_by_thread_tracker())
415411
.append(remote_log);
416412

src/common/tracing/src/lib.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@ mod panic_hook;
2626
mod remote_log;
2727
mod structlog;
2828

29+
mod predefined_tables;
30+
2931
pub use crash_hook::pipe_file;
3032
pub use crash_hook::SignalListener;
3133

3234
pub use crate::config::Config;
3335
pub use crate::config::FileConfig;
36+
pub use crate::config::HistoryConfig;
37+
pub use crate::config::HistoryTableConfig;
3438
pub use crate::config::OTLPConfig;
3539
pub use crate::config::OTLPEndpointConfig;
3640
pub use crate::config::OTLPProtocol;
37-
pub use crate::config::PersistentLogConfig;
3841
pub use crate::config::ProfileLogConfig;
3942
pub use crate::config::QueryLogConfig;
4043
pub use crate::config::StderrConfig;
@@ -48,16 +51,16 @@ pub use crate::init::start_trace_for_remote_request;
4851
pub use crate::init::GlobalLogger;
4952
pub use crate::panic_hook::log_panic;
5053
pub use crate::panic_hook::set_panic_hook;
54+
pub use crate::predefined_tables::init_history_tables;
55+
pub use crate::predefined_tables::HistoryTable;
5156
pub use crate::remote_log::convert_to_batch;
5257
pub use crate::remote_log::LogBuffer as RemoteLogBuffer;
5358
pub use crate::remote_log::LogMessage;
5459
pub use crate::remote_log::RemoteLog;
5560
pub use crate::remote_log::RemoteLogElement;
5661
pub use crate::remote_log::RemoteLogGuard;
57-
pub use crate::remote_log::PERSISTENT_LOG_SCHEMA_VERSION;
5862
pub use crate::structlog::DummyReporter;
5963
pub use crate::structlog::StructLogReporter;
60-
6164
pub fn closure_name<F: std::any::Any>() -> &'static str {
6265
let func_path = std::any::type_name::<F>();
6366
func_path
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::BTreeMap;
16+
use std::collections::HashMap;
17+
use std::sync::Arc;
18+
19+
use databend_common_exception::ErrorCode;
20+
use databend_common_exception::Result;
21+
22+
use crate::HistoryConfig;
23+
24+
const TABLES_TOML: &str = include_str!("./history_tables.toml");
25+
26+
#[derive(Debug)]
27+
pub struct HistoryTable {
28+
pub name: String,
29+
pub create: String,
30+
pub transform: String,
31+
pub delete: String,
32+
}
33+
34+
impl HistoryTable {
35+
pub fn create(predefined: PredefinedTable, retention: u64) -> Self {
36+
HistoryTable {
37+
name: predefined.name,
38+
create: predefined.create,
39+
transform: predefined.transform,
40+
delete: predefined
41+
.delete
42+
.replace("{retention_hours}", &retention.to_string()),
43+
}
44+
}
45+
46+
pub fn assemble_log_history_transform(&self, stage_name: &str, batch_number: u64) -> String {
47+
let mut transform = self.transform.clone();
48+
transform = transform.replace("{stage_name}", stage_name);
49+
transform = transform.replace("{batch_number}", &batch_number.to_string());
50+
transform
51+
}
52+
53+
pub fn assemble_normal_transform(&self, begin: u64, end: u64) -> String {
54+
let mut transform = self.transform.clone();
55+
transform = transform.replace("{batch_begin}", &begin.to_string());
56+
transform = transform.replace("{batch_end}", &end.to_string());
57+
transform
58+
}
59+
}
60+
61+
#[derive(serde::Deserialize)]
62+
pub struct PredefinedTables {
63+
pub tables: Vec<PredefinedTable>,
64+
}
65+
66+
#[derive(serde::Deserialize, Ord, PartialOrd, Eq, PartialEq)]
67+
pub struct PredefinedTable {
68+
pub name: String,
69+
pub target: String,
70+
pub create: String,
71+
pub transform: String,
72+
pub delete: String,
73+
}
74+
75+
pub fn init_history_tables(cfg: &HistoryConfig) -> Result<Vec<Arc<HistoryTable>>> {
76+
let predefined_tables: PredefinedTables =
77+
toml::from_str(TABLES_TOML).expect("Failed to parse toml");
78+
79+
let mut predefined_map: BTreeMap<String, PredefinedTable> = BTreeMap::from_iter(
80+
predefined_tables
81+
.tables
82+
.into_iter()
83+
.map(|table| (table.name.clone(), table)),
84+
);
85+
86+
let mut history_tables = Vec::with_capacity(cfg.tables.len());
87+
history_tables.push(Arc::new(HistoryTable::create(
88+
predefined_map.remove("log_history").unwrap(),
89+
24 * 7,
90+
)));
91+
for enable_table in cfg.tables.iter() {
92+
if let Some(predefined_table) = predefined_map.remove(&enable_table.table_name) {
93+
let retention = enable_table.retention;
94+
history_tables.push(Arc::new(HistoryTable::create(
95+
predefined_table,
96+
retention as u64,
97+
)));
98+
} else {
99+
return Err(ErrorCode::InvalidConfig(format!(
100+
"Invalid history table name {}",
101+
enable_table.table_name
102+
)));
103+
}
104+
}
105+
Ok(history_tables)
106+
}
107+
108+
pub fn table_to_target() -> HashMap<String, String> {
109+
let predefined_tables: PredefinedTables =
110+
toml::from_str(TABLES_TOML).expect("Failed to parse toml");
111+
let mut table_to_target = HashMap::new();
112+
for table in predefined_tables.tables {
113+
if table.name != "log_history" {
114+
table_to_target.insert(table.name, table.target);
115+
}
116+
}
117+
table_to_target
118+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[[tables]]
2+
name = "log_history"
3+
target = ""
4+
create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)"
5+
transform = "COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
6+
delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"
7+
8+
[[tables]]
9+
name = "query_history"
10+
target = "databend::log::query"
11+
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL) CLUSTER BY LINEAR(event_time, query_id)"
12+
transform = "INSERT INTO system_history.query_history FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenant_id'], m['cluster_id'], m['node_id'], m['sql_user'], m['sql_user_quota'], m['sql_user_privileges'], m['query_id'], m['query_kind'], m['query_text'], m['query_hash'], m['query_parameterized_hash'], m['event_date'], m['event_time'], m['query_start_time'], m['query_duration_ms'], m['query_queued_duration_ms'], m['current_database'], m['written_rows'], m['written_bytes'], m['join_spilled_rows'], m['join_spilled_bytes'], m['agg_spilled_rows'], m['agg_spilled_bytes'], m['group_by_spilled_rows'], m['group_by_spilled_bytes'], m['written_io_bytes'], m['written_io_bytes_cost_ms'], m['scan_rows'], m['scan_bytes'], m['scan_io_bytes'], m['scan_io_bytes_cost_ms'], m['scan_partitions'], m['total_partitions'], m['result_rows'], m['result_bytes'], m['bytes_from_remote_disk'], m['bytes_from_local_disk'], m['bytes_from_memory'], m['client_address'], m['user_agent'], m['exception_code'], m['exception_text'], m['server_version'], m['query_tag'], m['has_profile'], m['peek_memory_usage'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
13+
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"
14+
15+
[[tables]]
16+
name = "profile_history"
17+
target = "databend::log::profile"
18+
create = "CREATE TABLE IF NOT EXISTS system_history.profile_history (timestamp TIMESTAMP NULL, query_id VARCHAR NULL, profiles VARIANT NULL, statistics_desc VARIANT NULL) CLUSTER BY (timestamp, query_id)"
19+
transform = "INSERT INTO system_history.profile_history FROM (SELECT timestamp, m['query_id'], m['profiles'], m['statistics_desc'] FROM (SELECT timestamp, parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::profile' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
20+
delete = "DELETE FROM system_history.profile_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"
21+
22+
[[tables]]
23+
name = "login_history"
24+
target = "databend::log::login"
25+
create = "CREATE TABLE IF NOT EXISTS system_history.login_history (event_time TIMESTAMP NULL, handler STRING NULL, event_type STRING NULL, connection_uri STRING NULL, auth_type STRING NULL, user_name STRING NULL, client_ip STRING NULL, user_agent STRING NULL, session_id STRING NULL, node_id STRING NULL, error_message STRING NULL)"
26+
transform = "INSERT INTO system_history.login_history FROM (SELECT m['event_time'], m['handler'], m['event_type'], m['connection_uri'], m['auth_type'], m['user_name'], m['client_ip'], m['user_agent'], m['session_id'], m['node_id'], m['error_message'] FROM (SELECT parse_json(message) as m FROM system_history.log_history WHERE target='databend::log::login' AND batch_number >= {batch_begin} AND batch_number < {batch_end}))"
27+
delete = "DELETE FROM system_history.login_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"

0 commit comments

Comments
 (0)