Open
Description
Environment
Delta-rs version: 0.17.3
Binding: python
Bug
What happened:
I'm seeing Failed to commit transaction: 15
error when trying to write Iterator of recordbatches concurrently using append. When i switch it to be a pyarrow Table instead of an iterator, the error goes away - this is reproducible.
Looking through the write_deltalake
code it seems to convert everything to a RecordBatchReader
, so I'm not sure where this is coming from.
What you expected to happen:
Be able to concurrently append iterators of recordbatches.
How to reproduce it:
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa
SCHEMA = pa.schema(
[
("a", pa.int64()),
("b", pa.int64()),
("c", pa.int64()),
]
)
def random_table():
random_data = pd.DataFrame.from_dict(
{
"a": [random.randint(1, 100) for _ in range(500)],
"b": [random.randint(1, 100) for _ in range(500)],
"c": [random.randint(1, 100) for _ in range(500)],
}
)
time.sleep(random.random())
yield pa.Table.from_pandas(random_data, schema=SCHEMA).to_batches()[0]
# return pa.Table.from_pandas(random_data, schema=SCHEMA)
def write_table(table):
write_deltalake(table, random_table(), schema=SCHEMA, mode="append")
def main():
write_deltalake(
f"parallel_table",
pa.RecordBatchReader.from_batches(SCHEMA,random_table()),
mode="overwrite",
schema=SCHEMA,
)
tables = [
DeltaTable(
f"parallel_table",
)
for _ in range(50)
]
count = 0
with ThreadPoolExecutor() as executor:
futures = [executor.submit(write_table, table) for table in tables]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
count += 1
print(f"{count} An error occurred: {e}")
if __name__ == "__main__":
main()
If you comment in/out the line between yielding a recordbatch and returning a pyarrow table, you will see the two scenarios.