Skip to content

Full partition delete generates data into **_change_data** directory. #3687

@detoxifiedplant

Description

@detoxifiedplant

Environment

Delta-rs version:
0.27.0

Binding:
Rust

Environment:

  • Cloud provider:
  • OS: MacOS
  • Other:

Bug

What happened:
Running a delete query in partition while having Change data feed enabled, it generates data in _change_data directory.

What you expected to happen:
As per the documentation here, full partition delete will not generate data in the _change_data directory.

How to reproduce it:

Example
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use chrono::NaiveDate;
use datafusion::prelude::*;
use deltalake::arrow::array::{Date32Array, Int64Array, RecordBatch};
use deltalake::arrow::datatypes::{DataType, Field, Schema};
use deltalake::kernel::{PrimitiveType, StructField, StructType};
use deltalake::parquet::file::properties::WriterProperties;
use deltalake::{DeltaOps, DeltaTable};
use tokio::time::sleep;

const TABLE_PATH: &str = "/tmp/users_delta_table/";
const TEST_DATE: &str = "2025-04-12";
const ROWS_PER_ITERATION: usize = 200_000;
const TOTAL_ITERATIONS: usize = 5;

#[tokio::main]
async fn main() -> Result<()> {
    if Path::new(TABLE_PATH).exists() {
        fs::remove_dir_all(TABLE_PATH)?;
        println!("Cleaned up existing table at {}", TABLE_PATH);
    }

    // create table
    let table = create_users_table().await?;

    // Verify partition columns
    assert_eq!(
        table.metadata()?.partition_columns(),
        &vec!["date"],
        "Partition columns not set correctly"
    );

    println!(
        "Partition information: {:?}",
        table.metadata()?.partition_columns(),
    );

    // insert data for same partition(date column)
    let table = insert_test_data(table).await?;
    println!(
        "Number of inserted rows {}",
        ROWS_PER_ITERATION * TOTAL_ITERATIONS,
    );

    // confirm number of inserted records
    let records = records_count(&table).await?;
    assert_eq!(ROWS_PER_ITERATION * TOTAL_ITERATIONS, records as usize);

    // delete records by partition value (same date value)
    let table = delete_partition_data(table).await?;

    println!("Sleeping for 5 seconds",);
    sleep(Duration::from_secs(5)).await;
    // perform optimization and vacuum
    let table = run_optimize_and_vacuum(table).await?;

    // count number of records
    let records = records_count(&table).await?;
    assert_eq!(0, records);
    println!("Number of current records {}", records);
    Ok(())
}

async fn create_users_table() -> Result<DeltaTable> {
    let schema = StructType::new(vec![
        StructField::new("id", PrimitiveType::Long, false),
        StructField::new("date", PrimitiveType::Date, false),
    ]);
    let table = deltalake::DeltaOps::try_from_uri(TABLE_PATH)
        .await?
        .create()
        .with_columns(schema.fields().cloned())
        .with_configuration_property(deltalake::TableProperty::EnableChangeDataFeed, Some("true"))
        .with_configuration_property(deltalake::TableProperty::MinReaderVersion, Some("3"))
        .with_configuration_property(deltalake::TableProperty::MinWriterVersion, Some("7"))
        .with_configuration_property(
            deltalake::TableProperty::LogRetentionDuration,
            Some("interval 1 second"),
        )
        .with_configuration_property(
            deltalake::TableProperty::DeletedFileRetentionDuration,
            Some("interval 1 second"),
        )
        .with_table_name("users")
        .with_partition_columns(["date"])
        .await?;

    Ok(table)
}

async fn insert_test_data(mut table: DeltaTable) -> Result<DeltaTable> {
    let test_date = NaiveDate::parse_from_str(TEST_DATE, "%Y-%m-%d")?;
    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
    let date_value = test_date.signed_duration_since(epoch).num_days() as i32;

    let arrow_schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int64, false),
        Field::new("date", DataType::Date32, false),
    ]));

    for iteration in 1..=TOTAL_ITERATIONS {
        // Generate data for this iteration
        let start_id = ((iteration - 1) * ROWS_PER_ITERATION) as i64;
        let ids: Vec<i64> = (start_id..start_id + ROWS_PER_ITERATION as i64).collect();
        let dates: Vec<i32> = vec![date_value; ROWS_PER_ITERATION];

        let batch = RecordBatch::try_new(
            arrow_schema.clone(),
            vec![
                Arc::new(Int64Array::from(ids)),
                Arc::new(Date32Array::from(dates)),
            ],
        )?;

        let writer_props = WriterProperties::builder()
            .set_compression(deltalake::parquet::basic::Compression::SNAPPY)
            .build();

        // Prepare the write operation with the collected batches
        let write_op = deltalake::DeltaOps(table.clone())
            .write([batch])
            .with_schema_mode(deltalake::operations::write::SchemaMode::Merge)
            .with_partition_columns(["date"])
            .with_writer_properties(writer_props);

        table = write_op.await?;
    }

    Ok(table)
}

async fn records_count(table: &DeltaTable) -> Result<i64> {
    let ctx = SessionContext::new();
    ctx.register_table("users", Arc::new(table.clone()))?;
    let df = ctx.sql("SELECT COUNT(*) as row_count FROM users").await?;
    let batches = df.collect().await?;
    let records = batches
        .first()
        .unwrap()
        .column(0)
        .as_any()
        .downcast_ref::<deltalake::arrow::array::Int64Array>()
        .unwrap()
        .value(0);

    Ok(records)
}

async fn delete_partition_data(table: DeltaTable) -> Result<DeltaTable> {
    let (table, _) = DeltaOps(table)
        .delete()
        .with_predicate(format!("date = DATE '{}'", TEST_DATE))
        .await?;
    println!("Delete operation completed");
    Ok(table)
}

async fn run_optimize_and_vacuum(table: DeltaTable) -> Result<DeltaTable> {
    println!("Running optimize operation...");
    let (table, _) = DeltaOps(table).optimize().await?;

    sleep(Duration::from_secs(1)).await;

    println!("Running vacuum operation...");
    let (table, _) = DeltaOps(table)
        .vacuum()
        .with_retention_period(chrono::Duration::seconds(1))
        .with_dry_run(false)
        .await?;

    Ok(table)
}
[package]
name = "dtable"
version = "0.1.0"
edition = "2024"

[dependencies]
deltalake = { version = "0.27", features = ["datafusion"] }
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4"
anyhow = "1.0"
serde_json = "1.0"
datafusion = "48.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

More details:
When having Change Data Feed disabled, it does not generate data in _change_data directory.
Having it enabled and running full partition delete(i.e. 10GB per partition), it utilizes a lot of resources for good amount of time in order to generate data into _change_data directory

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions