Skip to content

Error with concurrent append and full=True vacuum #3645

@nicolo-barboni

Description

@nicolo-barboni

Environment

Delta-rs version: <=1.1.3

Binding: Python

Environment:

  • Cloud provider: Azure
  • OS: MacOS
  • Other:

Bug

What happened:
When you vacuum with parameter full=True a table that is being appended to, it happens that a parquet appended during the vacuuming gets deleted, but the log of that append operation remains. This causes an error during the next compaction, because the log suggests to read a parquet that is in reality deleted.

Compact iteration 2: performing compact...
Error during vacuum/compact: Failed to parse parquet: External: Object at location /Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/part-00001-cb2b350b-b11a-406b-a687-7e3b3d84362c-c000.snappy.parquet not found: No such file or directory (os error 2)

I suspect the issue arises from the timing of operations: the Parquet file is uploaded before the corresponding JSON log is written. At the time the VACUUM runs, the new Parquet file has been physically written, but has not yet been referenced in the Delta log (because the JSON log hasn't been written).
Since the file isn’t yet listed in the _delta_log metadata, VACUUM sees it as orphaned and deletes it.
Afterwards, the JSON log is written, adding a reference to a Parquet file that no longer exists.
Even though the Delta log shows an 'add' action for the Parquet file (as seen in the picture), there's no corresponding 'remove' entry, yet the actual Parquet file is missing.

Image

The first time we saw this issues happened when we tried to vacuum a table stored in an Azure Datalake Gen2, while another process was appending data to the delta lake every 15 seconds. It is also possible to replicate the error in local.

The output of the error reproduction shows:

(deltalake_manager) nicolo@NICOLO-VERSIRO deltalake_manager % /Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/.venv/bin/python /Users/nicolo/Documents/optibid/backend/apps/deltalake_
manager/test_deltalake.py
Starting concurrent upload and vacuum test...
Upload thread will add data every 0.1 seconds
Vacuum thread will check and vacuum every 1.0 seconds
Test will run for 30 seconds...
Compact iteration 0: performing compact...
Compact iteration 0: completed
The number of files to vacuum (full): 0
The number of files to vacuum (not full): 0
Vacuum iteration 0: 0 files vacuumed (full)
Uploaded 50 batches
Compact iteration 1: performing compact...
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 58 but the underlying table has been updated to 59
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
[2025-08-01T15:03:29Z ERROR deltalake_core::kernel::transaction] The transaction 60 already exists, will retry!
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 60 but the underlying table has been updated to 60
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 61 but the underlying table has been updated to 61
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
Compact iteration 1: completed
The number of files to vacuum (full): 57
The number of files to vacuum (not full): 56
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 62 but the underlying table has been updated to 62
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 62 but the underlying table has been updated to 63
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
Vacuum iteration 1: 57 files vacuumed (full)
[2025-08-01T15:03:29Z ERROR deltalake_core::kernel::transaction] The transaction 64 already exists, will retry!
[2025-08-01T15:03:29Z WARN  deltalake_core::kernel::transaction] Attempting to write a transaction 64 but the underlying table has been updated to 64
    DefaultLogStore(/Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/)
Uploaded 100 batches
Compact iteration 2: performing compact...
Error during vacuum/compact: Failed to parse parquet: External: Object at location /Users/nicolo/Documents/optibid/backend/apps/deltalake_manager/test_concurrent/part-00001-cb2b350b-b11a-406b-a687-7e3b3d84362c-c000.snappy.parquet not found: No such file or directory (os error 2)
Vacuum/Compact thread finished after 2 iterations
Uploaded 150 batches
Uploaded 200 batches
Uploaded 250 batches
Uploaded 300 batches
Uploaded 350 batches
Uploaded 400 batches
Uploaded 450 batches
Uploaded 500 batches
Upload thread finished after 503 iterations

What you expected to happen:
I wouldn't expect the file to be appended in the same moment as the vacuuming to be deleted.

How to reproduce it:

def upload_data_continuously(dt, df, stop_event, upload_interval=0.1):
    """Upload data continuously until stop event is set"""
    iteration = 0
    while not stop_event.is_set():
        try:
            write_deltalake(dt, df, mode='append')
            iteration += 1
            if iteration % 50 == 0:
                print(f'Uploaded {iteration} batches')
            time.sleep(upload_interval)
        except Exception as e:
            print(f'Error during upload: {e}')
            break
    print(f'Upload thread finished after {iteration} iterations')


def vacuum_continuously(dt, stop_event, vacuum_interval=1.0, full=False):
    """Vacuum data continuously until stop event is set"""
    iteration = 0
    while not stop_event.is_set():
        try:
            # Perform compact operation
            print(f'Compact iteration {iteration}: performing compact...')
            dt.optimize.compact()
            print(f'Compact iteration {iteration}: completed')

            print(f'The number of files to vacuum (not full): {len(dt.vacuum(dry_run=True, full=False))}')
            print(f'The number of files to vacuum (full): {len(dt.vacuum(dry_run=True, full=True))}')

            if not full:
                # Actually perform vacuum
                files_to_vacuum = dt.vacuum(dry_run=False, full=False)
                print(f'Vacuum iteration {iteration}: {len(files_to_vacuum)} files vacuumed (not full)')
            else:
                files_to_vacuum_full = dt.vacuum(dry_run=False, full=True)
                print(f'Vacuum iteration {iteration}: {len(files_to_vacuum_full)} files vacuumed (full)')

            iteration += 1
            time.sleep(vacuum_interval)
        except Exception as e:
            print(f'Error during vacuum/compact: {e}')
            break
    print(f'Vacuum/Compact thread finished after {iteration} iterations')


def test2(full):
    """Test uploading data continuously while vacuuming in parallel"""
    name_table = 'test_concurrent'

    # Clean up existing table
    if os.path.exists(name_table):
        shutil.rmtree(name_table)

    # Create schema and initial data
    schema_fields = [Field('num', 'integer', nullable=False), Field('letter', 'string', nullable=False)]
    df = pd.DataFrame({'num': [8, 9] * 100, 'letter': ['dd', 'ee'] * 100})

    # Create Delta table
    if not DeltaTable.is_deltatable(name_table):
        schema = Schema(schema_fields)
        dt = DeltaTable.create(name_table, schema=schema)
        dt.alter.set_table_properties({'delta.deletedFileRetentionDuration': 'interval 0 seconds'})
    else:
        dt = DeltaTable(name_table)

    # Create stop event for threads
    stop_event = threading.Event()

    print('Starting concurrent upload and vacuum test...')
    print('Upload thread will add data every 0.1 seconds')
    print('Vacuum thread will check and vacuum every 1.0 seconds')
    print('Test will run for 30 seconds...')

    # Start threads
    with ThreadPoolExecutor(max_workers=2) as executor:
        upload_future = executor.submit(upload_data_continuously, dt, df, stop_event, 0.01)
        vacuum_future = executor.submit(vacuum_continuously, dt, stop_event, 1.0, full=full)

        # Let the test run for 30 seconds
        time.sleep(10)

        # Signal threads to stop
        stop_event.set()

        # Wait for threads to finish
        upload_future.result()
        vacuum_future.result()

    print('Test completed!')
    print(f'Final table version: {dt.version()}')
    print(f'Final files vacuumed: {len(dt.vacuum(dry_run=False, full=full))}')


if __name__ == '__main__':
    test2(full=True)

More details:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions