Skip to content

Commit 663254b

Browse files
authored
feat(delta-table): add record-level upsert (#543)
- `DeltaTableUploadStager` - injects `RECORD_ID_LABEL` on every row, enabling per-record tracking. - `DeltaTableUploader.upload_dataframe` - deletes rows whose `record_id` matches the current file before appending new data instead of doing a full table overwrite. - handles concurrent writers by wrapping the delete-then-append sequence in a tenacity-based retry (`10` attempts, random 0.2-1 s back-off) that re-runs only on commit-conflict errors (“CommitFailed”, “Metadata changed”). - keeps the SIGABRT-work-around: runs the writer in a subprocess unless executing inside a daemon worker; error propagation unified through a single `multiprocessing.Queue`. - refactor - removed duplicated queue/error-handling blocks, preserved detailed explanatory comments. - added `@requires_dependencies(["tenacity"], extras="delta-table")` to enforce the new optional dependency.
1 parent 3634978 commit 663254b

File tree

12 files changed

+154
-55
lines changed

12 files changed

+154
-55
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.0.44
2+
3+
* **Improve DeltaTable ingestion process and reliability**
4+
15
## 1.0.43
26

37
* **Fix document limits in Confluence connectr**
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pandas
22
deltalake
33
boto3
4+
tenacity

test/integration/connectors/elasticsearch/test_elasticsearch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async def test_elasticsearch_source(source_index: str, movies_dataframe: pd.Data
206206
validate_downloaded_files=True,
207207
predownload_file_data_check=source_filedata_display_name_set_check,
208208
postdownload_file_data_check=source_filedata_display_name_set_check,
209-
exclude_fields_extend=["display_name"] # includes dynamic ids, might change
209+
exclude_fields_extend=["display_name"], # includes dynamic ids, might change
210210
),
211211
)
212212

test/integration/connectors/elasticsearch/test_opensearch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ async def test_opensearch_source(source_index: str, movies_dataframe: pd.DataFra
196196
validate_downloaded_files=True,
197197
predownload_file_data_check=source_filedata_display_name_set_check,
198198
postdownload_file_data_check=source_filedata_display_name_set_check,
199-
exclude_fields_extend=["display_name"] # includes dynamic ids, might change
199+
exclude_fields_extend=["display_name"], # includes dynamic ids, might change
200200
),
201201
)
202202

test/integration/connectors/test_astradb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ async def test_astra_search_source(
173173
validate_downloaded_files=True,
174174
predownload_file_data_check=source_filedata_display_name_set_check,
175175
postdownload_file_data_check=source_filedata_display_name_set_check,
176-
exclude_fields_extend=["display_name"] # includes dynamic ids, might change
176+
exclude_fields_extend=["display_name"], # includes dynamic ids, might change
177177
),
178178
)
179179

test/integration/connectors/test_confluence.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,6 @@ async def test_confluence_source_param(
7676
test_id=test_id,
7777
expected_num_files=expected_num_files,
7878
validate_downloaded_files=validate_downloaded_files,
79-
validate_file_data=validate_file_data
79+
validate_file_data=validate_file_data,
8080
),
81-
)
81+
)

test/integration/connectors/test_delta_table.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66
from deltalake import DeltaTable
77
from fsspec import get_filesystem_class
8+
from pydantic import Secret
89

910
from test.integration.connectors.utils.constants import DESTINATION_TAG, SQL_TAG
1011
from test.integration.utils import requires_env
@@ -27,13 +28,24 @@
2728
async def test_delta_table_destination_local(upload_file: Path, temp_dir: Path):
2829
destination_path = str(temp_dir)
2930
connection_config = DeltaTableConnectionConfig(
30-
access_config=DeltaTableAccessConfig(),
31+
access_config=Secret(DeltaTableAccessConfig()),
3132
table_uri=destination_path,
3233
)
3334
stager_config = DeltaTableUploadStagerConfig()
3435
stager = DeltaTableUploadStager(upload_stager_config=stager_config)
36+
37+
mock_file_data = FileData(
38+
identifier="mock file data",
39+
connector_type=CONNECTOR_TYPE,
40+
source_identifiers=SourceIdentifiers(
41+
filename=upload_file.name,
42+
fullpath=upload_file.name,
43+
),
44+
)
45+
3546
new_upload_file = stager.run(
3647
elements_filepath=upload_file,
48+
file_data=mock_file_data,
3749
output_dir=temp_dir,
3850
output_filename=upload_file.name,
3951
)
@@ -52,11 +64,10 @@ async def test_delta_table_destination_local(upload_file: Path, temp_dir: Path):
5264
await uploader.run_async(path=new_upload_file, file_data=file_data)
5365
else:
5466
uploader.run(path=new_upload_file, file_data=file_data)
55-
delta_table_path = os.path.join(destination_path, upload_file.name)
56-
delta_table = DeltaTable(table_uri=delta_table_path)
67+
delta_table = DeltaTable(table_uri=destination_path)
5768
df = delta_table.to_pandas()
5869

59-
EXPECTED_COLUMNS = 10
70+
EXPECTED_COLUMNS = 11
6071
EXPECTED_ROWS = 22
6172
assert len(df) == EXPECTED_ROWS, (
6273
f"Number of rows in table vs expected: {len(df)}/{EXPECTED_ROWS}"
@@ -86,17 +97,30 @@ async def test_delta_table_destination_s3(upload_file: Path, temp_dir: Path):
8697
s3_bucket = "s3://utic-platform-test-destination"
8798
destination_path = f"{s3_bucket}/destination/test"
8899
connection_config = DeltaTableConnectionConfig(
89-
access_config=DeltaTableAccessConfig(
90-
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
91-
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
100+
access_config=Secret(
101+
DeltaTableAccessConfig(
102+
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
103+
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
104+
)
92105
),
93106
aws_region=aws_credentials["AWS_REGION"],
94107
table_uri=destination_path,
95108
)
96109
stager_config = DeltaTableUploadStagerConfig()
97110
stager = DeltaTableUploadStager(upload_stager_config=stager_config)
111+
112+
mock_file_data = FileData(
113+
identifier="mock file data",
114+
connector_type=CONNECTOR_TYPE,
115+
source_identifiers=SourceIdentifiers(
116+
filename=upload_file.name,
117+
fullpath=upload_file.name,
118+
),
119+
)
120+
98121
new_upload_file = stager.run(
99122
elements_filepath=upload_file,
123+
file_data=mock_file_data,
100124
output_dir=temp_dir,
101125
output_filename=upload_file.name,
102126
)
@@ -117,11 +141,10 @@ async def test_delta_table_destination_s3(upload_file: Path, temp_dir: Path):
117141
await uploader.run_async(path=new_upload_file, file_data=file_data)
118142
else:
119143
uploader.run(path=new_upload_file, file_data=file_data)
120-
delta_table_path = os.path.join(destination_path, upload_file.name)
121-
delta_table = DeltaTable(table_uri=delta_table_path, storage_options=aws_credentials)
144+
delta_table = DeltaTable(table_uri=destination_path, storage_options=aws_credentials)
122145
df = delta_table.to_pandas()
123146

124-
EXPECTED_COLUMNS = 10
147+
EXPECTED_COLUMNS = 11
125148
EXPECTED_ROWS = 22
126149
assert len(df) == EXPECTED_ROWS, (
127150
f"Number of rows in table vs expected: {len(df)}/{EXPECTED_ROWS}"
@@ -149,17 +172,30 @@ async def test_delta_table_destination_s3_bad_creds(upload_file: Path, temp_dir:
149172
s3_bucket = "s3://utic-platform-test-destination"
150173
destination_path = f"{s3_bucket}/destination/test"
151174
connection_config = DeltaTableConnectionConfig(
152-
access_config=DeltaTableAccessConfig(
153-
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
154-
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
175+
access_config=Secret(
176+
DeltaTableAccessConfig(
177+
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
178+
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
179+
)
155180
),
156181
aws_region=aws_credentials["AWS_REGION"],
157182
table_uri=destination_path,
158183
)
159184
stager_config = DeltaTableUploadStagerConfig()
160185
stager = DeltaTableUploadStager(upload_stager_config=stager_config)
186+
187+
mock_file_data = FileData(
188+
identifier="mock file data",
189+
connector_type=CONNECTOR_TYPE,
190+
source_identifiers=SourceIdentifiers(
191+
filename=upload_file.name,
192+
fullpath=upload_file.name,
193+
),
194+
)
195+
161196
new_upload_file = stager.run(
162197
elements_filepath=upload_file,
198+
file_data=mock_file_data,
163199
output_dir=temp_dir,
164200
output_filename=upload_file.name,
165201
)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.43" # pragma: no cover
1+
__version__ = "1.0.44" # pragma: no cover

unstructured_ingest/processes/connectors/astradb.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ def run(self, **kwargs: Any) -> Generator[AstraDBBatchFileData, None, None]:
197197
id_batches = batch_generator(ids, self.index_config.batch_size)
198198
for batch in id_batches:
199199
batch_items = [BatchItem(identifier=b) for b in batch]
200-
display_name = (f"{self.index_config.collection_name}-{self.index_config.keyspace}"
201-
f"-[{batch_items[0].identifier}..{batch_items[-1].identifier}]")
200+
display_name = (
201+
f"{self.index_config.collection_name}-{self.index_config.keyspace}"
202+
f"-[{batch_items[0].identifier}..{batch_items[-1].identifier}]"
203+
)
202204
fd = AstraDBBatchFileData(
203205
connector_type=CONNECTOR_TYPE,
204206
metadata=FileDataSourceMetadata(

unstructured_ingest/processes/connectors/confluence.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def _get_docs_ids_within_one_space(self, space_key: str) -> List[dict]:
191191
status=None,
192192
)
193193
# Limit the number of documents to max_num_of_docs_from_each_space
194-
# Note: this is needed because the limit field in client.get_all_pages_from_space does
194+
# Note: this is needed because the limit field in client.get_all_pages_from_space does
195195
# not seem to work as expected
196196
limited_pages = pages[: self.index_config.max_num_of_docs_from_each_space]
197197
doc_ids = [{"space_id": space_key, "doc_id": page["id"]} for page in limited_pages]

0 commit comments

Comments
 (0)