Skip to content

Unable to write concurrently to delta table with python deltalake 1.14.0 library #3736

@coenvd

Description

@coenvd

Environment

Delta-rs version: 1.14.0

Binding: Python

Environment: All

  • Cloud provider: Azure, but also local
  • OS: All
  • Other: All

Bug

What happened:

Im trying to write concurrently to a single delta table, but run into delta log version collisions when I do.

I noticed it first in my cloud setup, which essentially is a python app that runs on k8s, streams data to a delta table in ADLS from a message queue, and scales horizontally based on queue depth. In high-volume usecases, multiple pods find the same delta version number to commit their transaction to, which is not accepted. However, after an unknown number of retries, the process crashes, which in turn increases the volume on the queue because messages will be processed again.

I have reproduced this behavior locally. An interesting detail is that this also shows how much commits are dropped. On my machine, it drops 142 out of 2000 commits with the semaphore variable set to 10. When I increase this, the number of drops also increases.

What you expected to happen: I expect, or rather hope, that it is possible, maybe with additional configuration, to write concurrently to a delta table. Im not yet fully convinced that this is a bug, but have been banging my head against this issue for so long that I thought of reporting it here as well.

How to reproduce it:

import asyncio
import os
import shutil
from deltalake import DeltaTable, write_deltalake
import polars as pl

CONCURRENCY_LIMIT = 10
TOTAL_APPENDS = 2000
DATA_PATH = './datatest_async/'

async def append_to_table(semaphore: asyncio.Semaphore, table_uri: str, data_to_append):
    """
    Asynchronously appends data to a Delta table, respecting the semaphore limit.
    """
    async with semaphore:
        try:
            await asyncio.to_thread(
                write_deltalake,
                table_uri,
                data_to_append,
                mode='append',
            )
            print(".", end="", flush=True)
        except Exception as e:
            print(f"An error occurred during write: {e}")


async def main():
    """
    Sets up the initial Delta table and runs concurrent append operations.
    """

    schema = {
        'col1': pl.Utf8,
        'col2': pl.List(pl.Int64),
    }

    initial_table = pl.DataFrame([{'col1': "init", 'col2': [1, 2, 3]}], schema=schema).to_arrow()
    append_data = pl.DataFrame([{'col1': "purchase", 'col2': [4, 5, 6]}], schema=schema).to_arrow()

    if os.path.exists(DATA_PATH):
        shutil.rmtree(DATA_PATH)

    print(f"Creating initial Delta table at '{DATA_PATH}'...")
    write_deltalake(DATA_PATH, initial_table, mode='overwrite')

    semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)

    print(
        f"\nStarting {TOTAL_APPENDS} append operations with a concurrency limit of {CONCURRENCY_LIMIT}..."
    )

    tasks = [
        append_to_table(semaphore, DATA_PATH, append_data)
        for _ in range(TOTAL_APPENDS)
    ]

    await asyncio.gather(*tasks)

    print("\n\nAll append operations completed.")
    final_dt = DeltaTable(DATA_PATH)
    initial_rows = initial_table.num_rows
    appended_rows_per_task = append_data.num_rows
    expected_rows = initial_rows + (TOTAL_APPENDS * appended_rows_per_task)

    print(f"Final table version: {final_dt.version()}")
    print(f"Final table row count: {pl.DataFrame(final_dt.to_pyarrow_table()).height}")
    print(f"Expected row count: {expected_rows}")


if __name__ == "__main__":
    asyncio.run(main())

More details: It seems to me that this issue has something to do with it, but not yet fully sure as I dont fully grasp the point in that bug report.

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions