Skip to content

Commit 8b0a55a

Browse files
fix: ibm watsonx s3 expired token error (#470)
* Fix token expiration error in ibm watsonx connector * version and changelog bump; ibm watsonx connector fix * Rename unit test * version and changelog bump; ibm watsonx connector fix
1 parent 43d6d65 commit 8b0a55a

File tree

4 files changed

+112
-9
lines changed

4 files changed

+112
-9
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
## 1.0.12-dev0
1+
## 1.0.12
22

33
### Fixes
44

55
* **Replaced google drive connector's mechanism for file downloads.**
6+
* **Fix Token expiration error in IBM watsonx.data connector**
67

78
## 1.0.11
89

test/unit/connectors/ibm_watsonx/test_ibm_watsonx_s3.py

+74-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pandas as pd
55
import pytest
66
from pydantic import Secret
7-
from pyiceberg.exceptions import CommitFailedException
7+
from pyiceberg.exceptions import CommitFailedException, RESTError
88
from pytest_mock import MockerFixture
99

1010
from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers
@@ -202,7 +202,7 @@ def test_ibm_watsonx_connection_config_bearer_token_soon_to_expire_token(
202202
):
203203
connection_config._bearer_token = {
204204
"access_token": "soon_to_expire_token",
205-
"expiration": timestamp_now + 60,
205+
"expiration": timestamp_now + (60 * 5),
206206
}
207207
mock_generate_bearer_token = mocker.patch.object(
208208
IbmWatsonxConnectionConfig,
@@ -332,6 +332,21 @@ def test_ibm_watsonx_uploader_upload_data_table_commit_exception(
332332
assert mock_table.refresh.call_count == 5
333333

334334

335+
def test_ibm_watsonx_uploader_upload_data_table_rest_error(
336+
uploader: IbmWatsonxUploader,
337+
mock_table: MagicMock,
338+
mock_transaction: MagicMock,
339+
mock_data_table: MagicMock,
340+
mock_delete: MagicMock,
341+
file_data: FileData,
342+
):
343+
mock_transaction.append.side_effect = RESTError()
344+
345+
with pytest.raises(RESTError):
346+
uploader.upload_data_table(mock_table, mock_data_table, file_data)
347+
assert mock_table.refresh.call_count == 0
348+
349+
335350
def test_ibm_watsonx_uploader_upload_data_table_exception(
336351
uploader: IbmWatsonxUploader,
337352
mock_table: MagicMock,
@@ -428,6 +443,63 @@ def test_ibm_watsonx_uploader_upload_dataframe_success(
428443
mock_upload_data_table.assert_called_once_with(mock_table, mock_data_table, file_data)
429444

430445

446+
def test_ibm_watsonx_uploader_upload_dataframe_rest_error(
447+
mocker: MockerFixture,
448+
uploader: IbmWatsonxUploader,
449+
test_df: pd.DataFrame,
450+
mock_get_table: MagicMock,
451+
mock_data_table: MagicMock,
452+
file_data: FileData,
453+
):
454+
mocker.patch.object(IbmWatsonxUploader, "_df_to_arrow_table", return_value=mock_data_table)
455+
mock_upload_data_table = mocker.patch.object(
456+
IbmWatsonxUploader, "upload_data_table", side_effect=RESTError()
457+
)
458+
459+
with pytest.raises(ProviderError):
460+
uploader.upload_dataframe(test_df, file_data)
461+
assert mock_get_table.call_count == 2
462+
assert mock_upload_data_table.call_count == 2
463+
464+
465+
def test_ibm_watsonx_uploader_upload_dataframe_provider_error(
466+
mocker: MockerFixture,
467+
uploader: IbmWatsonxUploader,
468+
test_df: pd.DataFrame,
469+
mock_get_table: MagicMock,
470+
mock_data_table: MagicMock,
471+
file_data: FileData,
472+
):
473+
mocker.patch.object(IbmWatsonxUploader, "_df_to_arrow_table", return_value=mock_data_table)
474+
mock_upload_data_table = mocker.patch.object(
475+
IbmWatsonxUploader, "upload_data_table", side_effect=ProviderError()
476+
)
477+
478+
with pytest.raises(ProviderError):
479+
uploader.upload_dataframe(test_df, file_data)
480+
mock_get_table.assert_called_once()
481+
mock_upload_data_table.assert_called_once()
482+
483+
484+
def test_ibm_watsonx_uploader_upload_dataframe_exception(
485+
mocker: MockerFixture,
486+
uploader: IbmWatsonxUploader,
487+
test_df: pd.DataFrame,
488+
mock_get_table: MagicMock,
489+
mock_data_table: MagicMock,
490+
file_data: FileData,
491+
):
492+
mocker.patch.object(IbmWatsonxUploader, "_df_to_arrow_table", return_value=mock_data_table)
493+
mock_upload_data_table = mocker.patch.object(
494+
IbmWatsonxUploader, "upload_data_table", side_effect=Exception()
495+
)
496+
497+
with pytest.raises(ProviderError):
498+
uploader.upload_dataframe(test_df, file_data)
499+
mock_get_table.assert_called_once()
500+
mock_upload_data_table.assert_called_once()
501+
502+
431503
def test_ibm_watsonx_uploader_delete_can_delete(
432504
mocker: MockerFixture,
433505
uploader: IbmWatsonxUploader,

unstructured_ingest/__version__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.12-dev0" # pragma: no cover
1+
__version__ = "1.0.12" # pragma: no cover

unstructured_ingest/processes/connectors/ibm_watsonx/ibm_watsonx_s3.py

+35-5
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ def object_storage_url(self) -> str:
6969

7070
@property
7171
def bearer_token(self) -> str:
72-
# Add 60 seconds to deal with edge cases where the token expires before the request is made
73-
timestamp = int(time.time()) + 60
72+
# Add 5 minutes to deal with edge cases where the token expires before the request is made
73+
timestamp = int(time.time()) + (60 * 5)
7474
if self._bearer_token is None or self._bearer_token.get("expiration", 0) <= timestamp:
7575
self._bearer_token = self.generate_bearer_token()
7676
return self._bearer_token["access_token"]
@@ -240,7 +240,7 @@ def _delete(self, transaction: "Transaction", identifier: str) -> None:
240240
def upload_data_table(
241241
self, table: "Table", data_table: "ArrowTable", file_data: FileData
242242
) -> None:
243-
from pyiceberg.exceptions import CommitFailedException
243+
from pyiceberg.exceptions import CommitFailedException, RESTError
244244
from tenacity import (
245245
before_log,
246246
retry,
@@ -265,21 +265,51 @@ def _upload_data_table(table: "Table", data_table: "ArrowTable", file_data: File
265265
table.refresh()
266266
logger.debug(e)
267267
raise IcebergCommitFailedException(e)
268+
except RESTError:
269+
raise
268270
except Exception as e:
269271
raise ProviderError(f"Failed to upload data to table: {e}")
270272

271273
try:
272274
return _upload_data_table(table, data_table, file_data)
275+
except RESTError:
276+
raise
273277
except ProviderError:
274278
raise
275279
except Exception as e:
276280
raise ProviderError(f"Failed to upload data to table: {e}")
277281

282+
@requires_dependencies(["pyiceberg", "tenacity"], extras="ibm-watsonx-s3")
278283
def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
284+
from pyiceberg.exceptions import RESTError
285+
from tenacity import (
286+
before_log,
287+
retry,
288+
retry_if_exception_type,
289+
stop_after_attempt,
290+
wait_random,
291+
)
292+
279293
data_table = self._df_to_arrow_table(df)
280294

281-
with self.get_table() as table:
282-
self.upload_data_table(table, data_table, file_data)
295+
# Retry connection in case of connection error
296+
@retry(
297+
stop=stop_after_attempt(2),
298+
wait=wait_random(),
299+
retry=retry_if_exception_type(RESTError),
300+
before=before_log(logger, logging.DEBUG),
301+
reraise=True,
302+
)
303+
def _upload_dataframe(data_table: Any, file_data: FileData) -> None:
304+
with self.get_table() as table:
305+
self.upload_data_table(table, data_table, file_data)
306+
307+
try:
308+
return _upload_dataframe(data_table, file_data)
309+
except ProviderError:
310+
raise
311+
except Exception as e:
312+
raise ProviderError(f"Failed to upload data to table: {e}")
283313

284314
@requires_dependencies(["pandas"], extras="ibm-watsonx-s3")
285315
def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None:

0 commit comments

Comments
 (0)