Skip to content

CDF not working due to vacuum deleting files that are still referenced by logs #3392

Open
@swanandx

Description

@swanandx

Environment

Delta-rs version: 0.25

Binding: Rust

Environment:

  • Cloud provider: Local FS / Any
  • OS: MacOS
  • Other: _

Bug

What happened:

Running VACUUM deleted files that were still referenced by some versions present in logs.

When file retention < log retention. e.g.

LogRetentionDuration = "interval 10 seconds"
DeletedFileRetentionDuration = "interval 5 seconds"

So after 5 secs files are getting deleted but the version .json file still exists under _delta_log so it returns that as earliest version. And when we try to load, it fails as files are deleted!

What you expected to happen:

Either get latest version should have returned latest valid version or CDF should skip the versions for which files are deleted instead of failing to load.

How to reproduce it:

PoC code
use deltalake::{
    arrow::datatypes::{DataType, Field, Schema, TimeUnit},
    datafusion::prelude::SessionContext,
    delta_datafusion::DeltaCdfTableProvider,
    operations::optimize::OptimizeType,
    writer::utils::record_batch_from_message,
    DeltaOps,
};
use tokio::time::sleep;

use std::{sync::Arc, time::Duration};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
    let v = serde_json::json!({
        "timestamp": 1743414195553000_i64,
    });

    let dest_schema = Arc::new(Schema::new(vec![Field::new(
        "timestamp",
        DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
        false,
    )]));

    let batch = record_batch_from_message(dest_schema, &[v]).unwrap();

    // let ops = DeltaOps::new_in_memory();
    let ops = DeltaOps::try_from_uri("./mylake").await.unwrap();

    let mut table = ops
        .create()
        .with_column(
            "timestamp",
            deltalake::kernel::DataType::TIMESTAMP,
            true,
            None,
        )
        .with_configuration_property(deltalake::TableProperty::EnableChangeDataFeed, Some("true"))
        .with_configuration_property(deltalake::TableProperty::MinReaderVersion, Some("3"))
        .with_configuration_property(deltalake::TableProperty::MinWriterVersion, Some("7"))
        .with_configuration_property(
            deltalake::TableProperty::LogRetentionDuration,
            Some("interval 10 seconds"),
        )
        .with_configuration_property(
            deltalake::TableProperty::DeletedFileRetentionDuration,
            Some("interval 5 seconds"),
        )
        .with_configuration_property(deltalake::TableProperty::CheckpointInterval, Some("5"))
        .with_table_name("my_table")
        .await?;

    table.load().await.unwrap();

    // write some data
    for _ in 0..2 {
        table = DeltaOps(table.clone())
            .write(vec![batch.clone()])
            .await
            .unwrap();
    }

    table.load().await.unwrap();

    let (table, metrics) = DeltaOps(table)
        .optimize()
        .with_type(OptimizeType::Compact)
        .await?;
    println!("OPTIMIZE: {:?}", metrics);
    println!("---x---");

    // waiting > 5 secs so DeletedFileRetentionDuration is elapsed
    sleep(Duration::from_millis(5500)).await;

    let (mut table, metrics) = DeltaOps(table).vacuum().with_dry_run(false).await?;
    println!("VACCUM: {:?}", metrics);
    println!("---x---");

    // even though _data files_ were delted, the logs ( .json files ) still exists
    // so get_earliest_version returns 0
    let earliest_version = table.get_earliest_version().await.unwrap();
    println!(">> earliest_version: {earliest_version}");
    assert_eq!(0, earliest_version);
    let cdf = DeltaOps(table.clone())
        .load_cdf()
        .with_starting_version(earliest_version);

    let provider = Arc::new(DeltaCdfTableProvider::try_new(cdf).unwrap());
    let df = SessionContext::new().read_table(provider).unwrap();

    // we fail here because it tries to read files it found in 0000000.json
    // but those were optimized and vacuumed above
    // hence it fails
    if let Err(e) = df.show().await {
        println!("[!] ERROR: Failed to read:  {e}")
    };
    println!("---x---");

    // wait 10 secs so that LogRetentionDuration is elapsed
    sleep(Duration::from_secs(10)).await;

    // writing data so that logs cleanup will be trigerred
    table = DeltaOps(table.clone())
        .write(vec![batch.clone()])
        .await
        .unwrap();

    let earliest_version = table.get_earliest_version().await.unwrap();

    // earliest_version won't be 0 now cuz LogRetentionDuration has elapsed
    // and we have created checkpoints of later version
    // so older log files were cleaned-up
    assert_ne!(0, earliest_version);
    println!(">> earliest_version after LogRetentionDuration elapsed: {earliest_version}");
    let cdf = DeltaOps(table.clone())
        .load_cdf()
        .with_starting_version(earliest_version);

    let provider = Arc::new(DeltaCdfTableProvider::try_new(cdf).unwrap());
    let df = SessionContext::new().read_table(provider).unwrap();

    // this suceeds because now the logs files of older versions
    // are also deleted.
    assert!(df.show().await.is_ok());

    Ok(())
}

More details:

Seeing latest version even when files are deleted seems as per specs ( due to both being controlled by different retention duration ), deleting files which are still referenced by some valid version doesn't seem ideal.

Please feel free to comment and LMK, thanks :)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions