Skip to content

chore: tweak transient table data retention settings #15346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ impl Interpreter for DropTableInterpreter {
// thus if we do not refresh the table instance, `truncate` will fail
let latest = tbl.as_ref().refresh(self.ctx.as_ref()).await?;
let maybe_fuse_table = FuseTable::try_from_table(latest.as_ref());
// if target table if of type FuseTable, purge its historical data
// if target table is of type FuseTable, purge its historical data
// otherwise, plain truncate
if let Ok(fuse_table) = maybe_fuse_table {
fuse_table
.do_truncate(
self.ctx.clone(),
&mut build_res.main_pipeline,
TruncateMode::Purge,
TruncateMode::DropAllPurge,
)
.await?
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)),
}),
("transient_data_retention_time_in_minutes", DefaultSettingValue {
value: UserSettingValue::UInt64(60),
desc: "Sets the transient data retention time in minutes.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..= 24 * 60)),
}),
("max_storage_io_requests", DefaultSettingValue {
value: UserSettingValue::UInt64(default_max_storage_io_requests),
desc: "Sets the maximum number of concurrent I/O requests.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ impl Settings {
self.try_get_u64("data_retention_time_in_days")
}

pub fn get_transient_data_retention_time_in_minutes(&self) -> Result<u64> {
self.try_get_u64("transient_data_retention_time_in_minutes")
}

pub fn get_max_storage_io_requests(&self) -> Result<u64> {
self.try_get_u64("max_storage_io_requests")
}
Expand Down
6 changes: 5 additions & 1 deletion src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,11 @@ impl Table for FuseTable {
keep_last_snapshot: bool,
dry_run: bool,
) -> Result<Option<Vec<String>>> {
match self.navigate_for_purge(&ctx, instant).await {
let by_pass_retention_check_for_nav_by_time_point = false;
match self
.navigate_for_purge(&ctx, instant, by_pass_retention_check_for_nav_by_time_point)
.await
{
Ok((table, files)) => {
table
.do_purge(&ctx, files, num_snapshot_limit, keep_last_snapshot, dry_run)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
use crate::operations::common::ConflictResolveContext;

#[async_trait::async_trait]
pub trait SnapshotGenerator {
pub trait SnapshotGenerator: Sync {
/// Convert to `Any`, to enable dynamic casting.
fn as_any(&self) -> &dyn Any;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum TruncateMode {
// Delete the data, used for delete operation.
Delete,
// Truncate and purge the historical data.
Purge,
DropAllPurge,
}

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::time::Instant;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use chrono::DateTime;
use chrono::Utc;
use databend_common_catalog::lock::Lock;
use databend_common_catalog::table::NavigationPoint;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_catalog::table_context::TableContext;
Expand Down Expand Up @@ -120,7 +123,7 @@ where F: SnapshotGenerator + Send + 'static
prev_snapshot_id: Option<SnapshotId>,
deduplicated_label: Option<String>,
) -> Result<ProcessorPtr> {
let purge = Self::do_purge(table, &snapshot_gen);
let purge = Self::need_purge(table, &snapshot_gen);
Ok(ProcessorPtr::create(Box::new(CommitSink {
state: State::None,
ctx,
Expand Down Expand Up @@ -188,18 +191,71 @@ where F: SnapshotGenerator + Send + 'static
Ok(Event::Async)
}

fn do_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
fn need_purge(table: &FuseTable, snapshot_gen: &F) -> bool {
if table.transient() {
return true;
}

snapshot_gen
.as_any()
.downcast_ref::<TruncateGenerator>()
.is_some_and(|gen| matches!(gen.mode(), TruncateMode::Purge))
.is_some_and(|gen| matches!(gen.mode(), TruncateMode::DropAllPurge))
}

fn do_truncate(&self) -> bool {
async fn purge_table(&self, tbl: &FuseTable) -> Result<()> {
let snapshot_files = tbl.list_snapshot_files().await?;
let keep_last_snapshot = true;
let dry_run = false;
tbl.do_purge(&self.ctx, snapshot_files, None, keep_last_snapshot, dry_run)
.await?;
Ok(())
}

async fn purge_transient_table(
&self,
tbl: &FuseTable,
snapshot_timestamp: DateTime<Utc>,
) -> Result<()> {
let transient_data_retention_minutes = self
.ctx
.get_settings()
.get_transient_data_retention_time_in_minutes()?;

if transient_data_retention_minutes == 0 {
// if transient_data_retention_time_in_minutes is set to 0,
// fallback to normal purge (which is slightly faster)
return self.purge_table(tbl).await;
}

let instant =
snapshot_timestamp - chrono::Duration::minutes(transient_data_retention_minutes as i64);

let by_pass_retention_period_checking = true;
let keep_last_snapshot = true;
let dry_run = false;

let (table, candidate_snapshots) = tbl
.navigate_for_purge(
&self.ctx,
Some(NavigationPoint::TimePoint(instant)),
by_pass_retention_period_checking,
)
.await?;

table
.do_purge(
&self.ctx,
candidate_snapshots,
None,
keep_last_snapshot,
dry_run,
)
.await?;

Ok(())
}

fn need_truncate(&self) -> bool {
self.snapshot_gen
.as_any()
.downcast_ref::<TruncateGenerator>()
Expand Down Expand Up @@ -351,6 +407,7 @@ where F: SnapshotGenerator + Send + 'static

self.dal.write(&location, data).await?;

let snapshot_timestamp = snapshot.timestamp.unwrap();
let catalog = self.ctx.get_catalog(table_info.catalog()).await?;
match FuseTable::update_table_meta(
catalog.clone(),
Expand All @@ -366,7 +423,7 @@ where F: SnapshotGenerator + Send + 'static
.await
{
Ok(_) => {
if self.do_truncate() {
if self.need_truncate() {
catalog
.truncate_table(&table_info, TruncateTableReq {
table_id: table_info.ident.table_id,
Expand All @@ -380,32 +437,34 @@ where F: SnapshotGenerator + Send + 'static
let latest = self.table.refresh(self.ctx.as_ref()).await?;
let tbl = FuseTable::try_from_table(latest.as_ref())?;

warn!(
"table detected, purging historical data. ({})",
tbl.table_info.ident
info!(
"purging historical data. (name{}, id {})",
tbl.table_info.name, tbl.table_info.ident
);

let keep_last_snapshot = true;
let snapshot_files = tbl.list_snapshot_files().await?;
if let Err(e) = tbl
.do_purge(
&self.ctx,
snapshot_files,
None,
keep_last_snapshot,
false,
)
.await
{
// Errors of GC, if any, are ignored, since GC task can be picked up
warn!(
"GC of table not success (this is not a permanent error). the error : {}",
e
);
// purge table, swallow errors
let table_is_transient = tbl.transient();
let res = if table_is_transient {
self.purge_transient_table(tbl, snapshot_timestamp).await
} else {
info!("GC of table done");
self.purge_table(tbl).await
};

match res {
Err(e) => warn!(
"purge table (name: {}, id: {}, transient: {}) failed (non-permanent error). the error : {}",
tbl.table_info.name,
tbl.table_info.ident,
table_is_transient,
e
),
Ok(()) => info!(
"purge table done. (name: {}, id: {}, transient: {})",
tbl.table_info.name, tbl.table_info.ident, table_is_transient,
),
}
}

metrics_inc_commit_mutation_success();
{
let elapsed_time = self.start_time.elapsed().as_millis();
Expand Down
27 changes: 18 additions & 9 deletions src/query/storages/fuse/src/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ impl FuseTable {
&self,
ctx: &Arc<dyn TableContext>,
instant: Option<NavigationPoint>,
by_pass_retention_check_for_nav_by_time_point: bool,
) -> Result<(Arc<FuseTable>, Vec<String>)> {
let retention =
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
let root_snapshot = if let Some(snapshot) = self.read_table_snapshot().await? {
snapshot
} else {
Expand All @@ -211,22 +210,32 @@ impl FuseTable {
};

assert!(root_snapshot.timestamp.is_some());
let mut time_point = root_snapshot.timestamp.unwrap() - retention;
let retention =
Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64);
let min_time_point = root_snapshot.timestamp.unwrap() - retention;

let (location, files) = match instant {
Some(NavigationPoint::TimePoint(point)) => {
time_point = std::cmp::min(point, time_point);
self.list_by_time_point(time_point).await
let nav_time_point = if by_pass_retention_check_for_nav_by_time_point {
point
} else {
std::cmp::max(point, min_time_point)
};
self.list_by_time_point(nav_time_point).await
}
Some(NavigationPoint::SnapshotID(snapshot_id)) => {
self.list_by_snapshot_id(snapshot_id.as_str(), time_point)
self.list_by_snapshot_id(snapshot_id.as_str(), min_time_point)
.await
}
Some(NavigationPoint::StreamInfo(info)) => self.list_by_stream(info, time_point).await,
None => self.list_by_time_point(time_point).await,
Some(NavigationPoint::StreamInfo(info)) => {
self.list_by_stream(info, min_time_point).await
}
None => self.list_by_time_point(min_time_point).await,
}?;

let table = self.navigate_to_time_point(location, time_point).await?;
let table = self
.navigate_to_time_point(location, min_time_point)
.await?;

Ok((table, files))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ USE db1
statement ok
CREATE TRANSIENT TABLE IF NOT EXISTS t09_0016(a int)

statement ok
set transient_data_retention_time_in_minutes = 0;

statement ok
INSERT INTO t09_0016 VALUES(1)

Expand All @@ -27,7 +30,7 @@ select * from t09_0016 order by a
3

query B
select count(*)=1 from fuse_snapshot('db1', 't09_0016')
select count(*) from fuse_snapshot('db1', 't09_0016')
----
1

Expand Down
Loading