Skip to content

Delete appears to be single threaded. #2574

Open
@adamfaulkner-at

Description

@adamfaulkner-at

Environment

Delta-rs version: 0.17.3

Binding: Rust

Environment:

  • Cloud provider: AWS
  • OS: Ubuntu 22.04.3 LTS
  • Other:

Bug

What happened:

I'm trying to delete 18 rows from a delta lake table that has 100M rows in it. This table is stored in Amazon S3 as 38 parquet files, which are all about 100MB each. To accomplish this, I've built a predicate that uses an IN on the "rideid", which is the column that this table is sorted by.

    let session_config = SessionConfig::new()
        .set_bool("datafusion.execution.parquet.pushdown_filters", true)
        .set_bool("datafusion.execution.parquet.reorder_filters", true)
        .set_usize("datafusion.execution.target_partitions", 32);
    let ctx = SessionContext::new_with_config(session_config);
...
            let delete_predicate = col("rideid").in_list(
                overlap_results
                    .iter()
                    .flat_map(|overlap_result_batch| {
                        let rideid_column: &StringArray = overlap_result_batch
                            .column_by_name("rideid")
                            .unwrap()
                            .as_string();
                        (0..rideid_column.len()).map(|i| lit(rideid_column.value(i)))
                    })
                    .collect(),
                false,
            );
            let (delete_table, delete_metrics) = DeltaOps(table)
                .delete()
                .with_predicate(delete_predicate)
                .with_session_state(SessionState::new_with_config_rt(
                    ctx.copied_config(),
                    ctx.runtime_env(),
                ))
                .with_writer_properties(
                    WriterProperties::builder()
                        .set_compression(Compression::ZSTD(ZstdLevel::default()))
                        .build(),
                )
                .await?;
            println!(
                "Overlapping records deleted. Elapsed Time: {} {:?}",
                batch_start_time.elapsed().as_secs_f64(),
                delete_metrics
            );
Overlapping records deleted. Elapsed Time: 55.433372588 DeleteMetrics { num_added_files: 12, num_removed_files: 12, num_deleted_rows: Some(18), num_copied_rows: Some(32170369), execution_time_ms: 52076, scan_time_ms: 243, rewrite_time_ms: 51832 }

(overlap_result_batch is is just a RecordBatch that contains the rideids that I'd like to delete, I don't think it's important for understanding here.)

I don't expect this to be fast, since it does need to rewrite half of the data. What I'm observing though is that it only uses 1 core for 50 seconds or so, as it rewrites all 18 files one at a time.

What you expected to happen:
I would expect for delete to concurrently rewrite all of the files that need rewriting instead of doing them one at a time. I'd expect this concurrency to be exposed in configuration somewhere.

How to reproduce it:
Perform a delete of a table that contains multiple parquet files.

More details:

Metadata

Metadata

Assignees

No one assigned

    Labels

    binding/rustIssues for the Rust cratebugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions