Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [1.2.22]

* **feat: add teradata source and destination

## [1.2.21]

* **fix: Enforce minimum version of databricks-sdk (>=0.62.0) for databricks-volumes connector**
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ sharepoint = ["requirements/connectors/sharepoint.txt"]
singlestore = ["requirements/connectors/singlestore.txt"]
slack = ["requirements/connectors/slack.txt"]
snowflake = ["requirements/connectors/snowflake.txt"]
teradata = ["requirements/connectors/teradata.txt"]
vastdb = ["requirements/connectors/vastdb.txt"]
vectara = ["requirements/connectors/vectara.txt"]
weaviate = ["requirements/connectors/weaviate.txt"]
Expand Down
3 changes: 3 additions & 0 deletions requirements/connectors/teradata.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pandas
teradatasql

1 change: 1 addition & 0 deletions test/integration/connectors/test_jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)


@pytest.mark.skip(reason="Jira test instance unavailable (503 error)")
@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, UNCATEGORIZED_TAG)
@requires_env("JIRA_INGEST_USER_EMAIL", "JIRA_INGEST_API_TOKEN")
Expand Down
314 changes: 314 additions & 0 deletions test/unit/connectors/sql/test_teradata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
from unittest.mock import MagicMock

import pandas as pd
import pytest
from pydantic import Secret
from pytest_mock import MockerFixture

from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers
from unstructured_ingest.processes.connectors.sql.teradata import (
TeradataAccessConfig,
TeradataConnectionConfig,
TeradataDownloader,
TeradataDownloaderConfig,
TeradataUploader,
TeradataUploaderConfig,
TeradataUploadStager,
)


@pytest.fixture
def teradata_access_config():
return TeradataAccessConfig(password="test_password")


@pytest.fixture
def teradata_connection_config(teradata_access_config: TeradataAccessConfig):
return TeradataConnectionConfig(
host="test-host.teradata.com",
user="test_user",
database="test_db",
dbs_port=1025,
access_config=Secret(teradata_access_config),
)


@pytest.fixture
def teradata_uploader(teradata_connection_config: TeradataConnectionConfig):
return TeradataUploader(
connection_config=teradata_connection_config,
upload_config=TeradataUploaderConfig(table_name="test_table", record_id_key="record_id"),
)


@pytest.fixture
def teradata_downloader(teradata_connection_config: TeradataConnectionConfig):
return TeradataDownloader(
connection_config=teradata_connection_config,
download_config=TeradataDownloaderConfig(
fields=["id", "text", "year"],
id_column="id",
),
)


@pytest.fixture
def teradata_upload_stager():
return TeradataUploadStager()


@pytest.fixture
def mock_cursor(mocker: MockerFixture):
return mocker.MagicMock()


@pytest.fixture
def mock_get_cursor(mocker: MockerFixture, mock_cursor: MagicMock):
mock = mocker.patch(
"unstructured_ingest.processes.connectors.sql.teradata.TeradataConnectionConfig.get_cursor",
autospec=True,
)
mock.return_value.__enter__.return_value = mock_cursor
return mock


def test_teradata_connection_config_with_database(teradata_access_config: TeradataAccessConfig):
config = TeradataConnectionConfig(
host="test-host.teradata.com",
user="test_user",
database="my_database",
dbs_port=1025,
access_config=Secret(teradata_access_config),
)
assert config.database == "my_database"
assert config.dbs_port == 1025


def test_teradata_connection_config_default_port(teradata_access_config: TeradataAccessConfig):
config = TeradataConnectionConfig(
host="test-host.teradata.com",
user="test_user",
access_config=Secret(teradata_access_config),
)
assert config.dbs_port == 1025
assert config.database is None


def test_teradata_downloader_query_db_quotes_identifiers(
mock_cursor: MagicMock,
teradata_downloader: TeradataDownloader,
mock_get_cursor: MagicMock,
):
"""Test that query_db quotes all table and column names to handle reserved words."""
mock_cursor.fetchall.return_value = [
(1, "text1", 2020),
(2, "text2", 2021),
]
mock_cursor.description = [("id",), ("text",), ("year",)]

# Create proper mock structure for SqlBatchFileData
mock_item = MagicMock()
mock_item.identifier = "test_id"

batch_data = MagicMock()
batch_data.additional_metadata.table_name = "elements"
batch_data.additional_metadata.id_column = "id"
batch_data.batch_items = [mock_item]

_, _ = teradata_downloader.query_db(batch_data)

# Verify the SELECT statement quotes all identifiers
call_args = mock_cursor.execute.call_args[0][0]
assert '"id"' in call_args # Field name quoted
assert '"text"' in call_args # Field name quoted
assert '"year"' in call_args # Reserved word field quoted
assert '"elements"' in call_args # Table name quoted
# Verify WHERE clause also quotes the id column
assert 'WHERE "id" IN' in call_args


def test_teradata_downloader_query_db_returns_correct_data(
mock_cursor: MagicMock,
teradata_downloader: TeradataDownloader,
mock_get_cursor: MagicMock,
):
"""Test that query_db returns data in the expected format."""
mock_cursor.fetchall.return_value = [
(1, "text1", 2020),
(2, "text2", 2021),
]
mock_cursor.description = [("id",), ("text",), ("year",)]

# Create proper mock structure for SqlBatchFileData
mock_item = MagicMock()
mock_item.identifier = "test_id"

batch_data = MagicMock()
batch_data.additional_metadata.table_name = "elements"
batch_data.additional_metadata.id_column = "id"
batch_data.batch_items = [mock_item]

results, columns = teradata_downloader.query_db(batch_data)

assert results == [(1, "text1", 2020), (2, "text2", 2021)]
assert columns == ["id", "text", "year"]


def test_teradata_upload_stager_converts_lists_to_json(
teradata_upload_stager: TeradataUploadStager,
):
"""Test that conform_dataframe converts Python lists to JSON strings."""
df = pd.DataFrame(
{
"text": ["text1", "text2"],
"languages": [["en"], ["en", "fr"]],
"id": [1, 2],
}
)

result = teradata_upload_stager.conform_dataframe(df)

# languages column should be JSON strings now
assert isinstance(result["languages"].iloc[0], str)
assert result["languages"].iloc[0] == '["en"]'
assert result["languages"].iloc[1] == '["en", "fr"]'
# Other columns should be unchanged
assert result["text"].iloc[0] == "text1"
assert result["id"].iloc[0] == 1


def test_teradata_upload_stager_converts_dicts_to_json(
teradata_upload_stager: TeradataUploadStager,
):
"""Test that conform_dataframe converts Python dicts to JSON strings."""
df = pd.DataFrame(
{
"text": ["text1", "text2"],
"metadata": [{"key": "value1"}, {"key": "value2"}],
"id": [1, 2],
}
)

result = teradata_upload_stager.conform_dataframe(df)

# metadata column should be JSON strings now
assert isinstance(result["metadata"].iloc[0], str)
assert result["metadata"].iloc[0] == '{"key": "value1"}'
assert result["metadata"].iloc[1] == '{"key": "value2"}'


def test_teradata_upload_stager_handles_empty_dataframe(
teradata_upload_stager: TeradataUploadStager,
):
"""Test that conform_dataframe handles empty DataFrames."""
df = pd.DataFrame({"text": [], "languages": []})

result = teradata_upload_stager.conform_dataframe(df)

assert len(result) == 0
assert "text" in result.columns
assert "languages" in result.columns


def test_teradata_upload_stager_handles_none_values(
teradata_upload_stager: TeradataUploadStager,
):
"""Test that conform_dataframe handles None values in list/dict columns."""
df = pd.DataFrame(
{
"text": ["text1", "text2"],
"languages": [["en"], None],
}
)

result = teradata_upload_stager.conform_dataframe(df)

# First row should be JSON string, second should be None
assert result["languages"].iloc[0] == '["en"]'
assert pd.isna(result["languages"].iloc[1])


def test_teradata_uploader_get_table_columns_uses_top_syntax(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Test that get_table_columns uses Teradata's TOP syntax instead of LIMIT."""
mock_cursor.description = [("id",), ("text",), ("type",)]

columns = teradata_uploader.get_table_columns()

# Verify the query uses TOP instead of LIMIT
call_args = mock_cursor.execute.call_args[0][0]
assert "SELECT TOP 1" in call_args
assert "LIMIT" not in call_args
assert columns == ["id", "text", "type"]


def test_teradata_uploader_delete_by_record_id_quotes_identifiers(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Test that delete_by_record_id quotes table and column names."""
mock_cursor.rowcount = 5

file_data = FileData(
identifier="test_file.txt",
connector_type="local",
source_identifiers=SourceIdentifiers(
filename="test_file.txt", fullpath="/path/to/test_file.txt"
),
)

teradata_uploader.delete_by_record_id(file_data)

# Verify the DELETE statement quotes identifiers
call_args = mock_cursor.execute.call_args[0][0]
assert 'DELETE FROM "test_table"' in call_args
assert 'WHERE "record_id" = ?' in call_args


def test_teradata_uploader_upload_dataframe_quotes_column_names(
mocker: MockerFixture,
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Test that upload_dataframe quotes all column names in INSERT statement."""
df = pd.DataFrame(
{
"id": [1, 2],
"text": ["text1", "text2"],
"type": ["Title", "NarrativeText"],
"record_id": ["file1", "file1"],
}
)

# Mock _fit_to_schema to return the same df
mocker.patch.object(teradata_uploader, "_fit_to_schema", return_value=df)
# Mock can_delete to return False
mocker.patch.object(teradata_uploader, "can_delete", return_value=False)

file_data = FileData(
identifier="test_file.txt",
connector_type="local",
source_identifiers=SourceIdentifiers(
filename="test_file.txt", fullpath="/path/to/test_file.txt"
),
)

teradata_uploader.upload_dataframe(df, file_data)

# Verify the INSERT statement quotes all column names AND table name
call_args = mock_cursor.executemany.call_args[0][0]
assert '"id"' in call_args
assert '"text"' in call_args
assert '"type"' in call_args # Reserved word must be quoted
assert '"record_id"' in call_args
assert 'INSERT INTO "test_table"' in call_args # Table name must be quoted too


def test_teradata_uploader_values_delimiter_is_qmark(teradata_uploader: TeradataUploader):
"""Test that Teradata uses qmark (?) parameter style."""
assert teradata_uploader.values_delimiter == "?"
Original file line number Diff line number Diff line change
Expand Up @@ -755,36 +755,9 @@
}
}
},
{
"type": "UncategorizedText",
"element_id": "f35698f7adf4541a2bb6fe10e1c47ab8",
"text": "L",
"metadata": {
"filetype": "application/pdf",
"languages": [
"eng"
],
"page_number": 4,
"data_source": {
"url": "s3://utic-dev-tech-fixtures/small-pdf-set/2023-Jan-economic-outlook.pdf",
"version": "c7eed4fc056b089a98f6a3ad9ec9373e",
"record_locator": {
"protocol": "s3",
"remote_file_path": "s3://utic-dev-tech-fixtures/small-pdf-set/",
"metadata": {
"ingest-test": "custom metadata"
}
},
"date_created": "1720544414.0",
"date_modified": "1720544414.0",
"permissions_data": null,
"filesize_bytes": 2215938
}
}
},
{
"type": "NarrativeText",
"element_id": "59ff5bef6f9522074a0347b3fc30d9ba",
"element_id": "cf86df4360039e44a9d36c2156253dca",
"text": "In the United States, growth is projected to fall from 2.0 percent in 2022 to 1.4 percent in 2023 and 1.0 percent in 2024. With growth rebounding in the second half of 2024, growth in 2024 will be faster than in 2023 on a fourth-quarter-over-fourth-quarter basis, as in most advanced",
"metadata": {
"filetype": "application/pdf",
Expand All @@ -811,7 +784,7 @@
},
{
"type": "Footer",
"element_id": "7e03381d8b00018712cf4714181944d5",
"element_id": "8342c28e0cf94f8454bf1c8f4e5b5b8b",
"text": "International Monetary Fund | January 2023",
"metadata": {
"filetype": "application/pdf",
Expand All @@ -838,7 +811,7 @@
},
{
"type": "PageNumber",
"element_id": "e7ad52a3c6a360dcc012d8904b7d68bb",
"element_id": "6ba40e8cf2a64cad3f1484387a1d6e9b",
"text": "3",
"metadata": {
"filetype": "application/pdf",
Expand Down
Loading