Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ jobs:
OPENSEARCH_AWS_ACCESS_KEY_ID: ${{ secrets.OPENSEARCH_AWS_ACCESS_KEY_ID }}
OPENSEARCH_AWS_SECRET_ACCESS_KEY: ${{ secrets.OPENSEARCH_AWS_SECRET_ACCESS_KEY }}
OPENSEARCH_AWS_HOST: ${{ secrets.OPENSEARCH_AWS_HOST }}
OPENSEARCH_AOSS_HOST: ${{ secrets.OPENSEARCH_AOSS_HOST }}
# Redis
AZURE_REDIS_INGEST_TEST_PASSWORD: ${{ secrets.AZURE_REDIS_INGEST_TEST_PASSWORD }}
# Vectara
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [1.4.5]

* **fix: add capability to use opensearch serverless**

## [1.4.4]

* **fix: add table precheck to teradata source**
Expand Down
300 changes: 300 additions & 0 deletions test/integration/connectors/elasticsearch/test_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
from _pytest.fixtures import TopRequest
from opensearchpy import Document, Keyword, OpenSearch, Text
from opensearchpy.exceptions import TransportError

from test.integration.connectors.utils.constants import DESTINATION_TAG, NOSQL_TAG, SOURCE_TAG
from test.integration.connectors.utils.docker import HealthCheck, container_context
Expand Down Expand Up @@ -400,6 +401,26 @@ def aws_credentials():
}


@pytest.fixture
def aoss_credentials():
"""Fixture that provides AWS credentials and AOSS host from environment variables."""
aws_access_key_id = os.getenv("OPENSEARCH_AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("OPENSEARCH_AWS_SECRET_ACCESS_KEY")
aoss_host = os.getenv("OPENSEARCH_AOSS_HOST")

if not all([aws_access_key_id, aws_secret_access_key, aoss_host]):
pytest.skip(
"AOSS credentials not available. Set OPENSEARCH_AWS_ACCESS_KEY_ID, "
"OPENSEARCH_AWS_SECRET_ACCESS_KEY, and OPENSEARCH_AOSS_HOST environment variables."
)

return {
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
"aoss_host": aoss_host,
}


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, NOSQL_TAG, "aws", "iam")
async def test_opensearch_source_with_iam(aws_credentials: dict):
Expand Down Expand Up @@ -618,10 +639,20 @@ def test_opensearch_destination_iam_precheck_fail_invalid_credentials():
("https://abc123xyz.us-east-1.aoss.amazonaws.com", "us-east-1", "aoss"),
("https://abc123xyz.eu-west-1.aoss.amazonaws.com", "eu-west-1", "aoss"),
("https://abc123xyz.us-gov-west-1.aoss.amazonaws.com", "us-gov-west-1", "aoss"),
# FIPS-compliant endpoints
("https://abc123xyz.us-east-1.aoss-fips.amazonaws.com", "us-east-1", "aoss"),
("https://abc123xyz.us-east-2.aoss-fips.amazonaws.com", "us-east-2", "aoss"),
("https://abc123xyz.us-gov-west-1.aoss-fips.amazonaws.com", "us-gov-west-1", "aoss"),
("https://abc123xyz.ca-central-1.aoss-fips.amazonaws.com", "ca-central-1", "aoss"),
("https://search-domain.us-east-1.es-fips.amazonaws.com", "us-east-1", "es"),
("https://search-domain.us-east-2.es-fips.amazonaws.com", "us-east-2", "es"),
("https://search-domain.us-gov-west-1.es-fips.amazonaws.com", "us-gov-west-1", "es"),
("https://search-domain.ca-central-1.es-fips.amazonaws.com", "ca-central-1", "es"),
# Without https://
("search-domain.us-east-1.es.amazonaws.com", "us-east-1", "es"),
# With port
("https://search-domain.us-east-1.es.amazonaws.com:443", "us-east-1", "es"),
("https://abc123xyz.us-east-1.aoss-fips.amazonaws.com:443", "us-east-1", "aoss"),
],
)
def test_detect_aws_opensearch_config_valid(hostname, expected_region, expected_service):
Expand Down Expand Up @@ -668,6 +699,112 @@ def test_opensearch_uploader_config_batch_size_default():
)


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG)
@pytest.mark.parametrize(
("status_code", "error_class"),
[
(403, "AuthorizationException"),
(400, "RequestError"),
(404, "NotFoundError"),
],
)
async def test_opensearch_indexer_pit_fallback_to_scroll(status_code, error_class):
"""Test that _get_doc_ids_async falls back to scroll when create_pit fails.

Covers: 403 (missing permissions), 400 (unsupported endpoint), 404 (pre-2.4 OpenSearch).
The fallback is scoped to the create_pit call only.
"""
from unittest.mock import AsyncMock, patch

from opensearchpy.exceptions import TransportError

connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(password="admin"),
username="admin",
hosts=["http://localhost:9200"],
use_ssl=True,
)
indexer = OpenSearchIndexer(
connection_config=connection_config,
index_config=OpenSearchIndexerConfig(index_name="test_index"),
)

expected_ids = {"id1", "id2", "id3"}

mock_client = AsyncMock()
mock_client.create_pit = AsyncMock(side_effect=TransportError(status_code, error_class))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)

with (
patch("opensearchpy.AsyncOpenSearch", return_value=mock_client),
patch.object(
indexer, "_get_doc_ids_scroll", new_callable=AsyncMock, return_value=expected_ids
) as mock_scroll,
patch.object(
connection_config,
"get_async_client_kwargs",
new_callable=AsyncMock,
return_value={"hosts": ["http://localhost:9200"]},
),
):
result = await indexer._get_doc_ids_async()

mock_client.create_pit.assert_called_once()
mock_scroll.assert_called_once()
assert result == expected_ids


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG)
@pytest.mark.parametrize(
("exception",),
[
(TransportError(500, "internal_server_error"),),
(ConnectionError("cluster unreachable"),),
],
)
async def test_opensearch_indexer_pit_no_fallback_on_other_errors(exception):
"""Test that _get_doc_ids_async re-raises non-fallback errors without trying scroll.

Covers: 500 (server error), ConnectionError (network). These should NOT fall back.
"""
from unittest.mock import AsyncMock, patch

connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(password="admin"),
username="admin",
hosts=["http://localhost:9200"],
use_ssl=True,
)
indexer = OpenSearchIndexer(
connection_config=connection_config,
index_config=OpenSearchIndexerConfig(index_name="test_index"),
)

mock_client = AsyncMock()
mock_client.create_pit = AsyncMock(side_effect=exception)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)

with (
patch("opensearchpy.AsyncOpenSearch", return_value=mock_client),
patch.object(indexer, "_get_doc_ids_scroll", new_callable=AsyncMock) as mock_scroll,
patch.object(
connection_config,
"get_async_client_kwargs",
new_callable=AsyncMock,
return_value={"hosts": ["http://localhost:9200"]},
),
):
with pytest.raises(type(exception)):
await indexer._get_doc_ids_async()

mock_client.create_pit.assert_called_once()
mock_scroll.assert_not_called()


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
async def test_opensearch_connection_config_retry_settings():
Expand All @@ -691,3 +828,166 @@ async def test_opensearch_connection_config_retry_settings():
)
assert client_kwargs.get("retry_on_timeout") is True, "Should retry on timeout"
assert client_kwargs.get("timeout") == 60, "Should have 60 second timeout"


# AWS OpenSearch Serverless (AOSS) Integration Tests
# These tests require OPENSEARCH_AOSS_HOST in addition to the standard AWS credentials.
# The AOSS collection must have data access policies granting the IAM user
# aoss:ReadDocument and aoss:WriteDocument permissions.


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, NOSQL_TAG, "aws", "aoss")
async def test_opensearch_aoss_source(aoss_credentials: dict):
"""Test OpenSearch source connector against a live AOSS collection.

Validates the PIT + search_after pagination path, which is required for AOSS
(scroll is not supported on serverless). Runs indexer and downloader directly
rather than through source_connector_validation to avoid fixture dependencies.
"""
indexer_config = OpenSearchIndexerConfig(index_name="opensearch_serverless_e2e_source")

with tempfile.TemporaryDirectory() as tempdir:
tempdir_path = Path(tempdir)
connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(
aws_access_key_id=aoss_credentials["aws_access_key_id"],
aws_secret_access_key=aoss_credentials["aws_secret_access_key"],
),
hosts=[aoss_credentials["aoss_host"]],
use_ssl=True,
verify_certs=True,
)
download_config = OpenSearchDownloaderConfig(download_dir=tempdir_path)

indexer = OpenSearchIndexer(
connection_config=connection_config, index_config=indexer_config
)
downloader = OpenSearchDownloader(
connection_config=connection_config, download_config=download_config
)

import concurrent.futures

original_precheck = indexer.precheck

def threaded_precheck():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(original_precheck)
future.result()

threaded_precheck()

batch_count = 0
total_downloaded = 0
async for file_data in indexer.run_async():
assert file_data
batch_count += 1
resp = await downloader.run_async(file_data=file_data)
assert resp
total_downloaded += len(resp)

assert batch_count == 11, f"Expected 11 batches, got {batch_count}"
assert total_downloaded == 1010, f"Expected 1010 documents, got {total_downloaded}"


@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, NOSQL_TAG, "aws", "aoss")
async def test_opensearch_aoss_destination(
upload_file: Path,
tmp_path: Path,
aoss_credentials: dict,
):
"""Test OpenSearch destination connector against a live AOSS collection.

Validates upload via async_bulk to serverless. Note: AOSS has eventual
consistency (10-60s propagation), so we only verify no exceptions are raised.
"""
file_data = FileData(
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
connector_type=CONNECTOR_TYPE,
identifier="mock file data aoss test",
)

connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(
aws_access_key_id=aoss_credentials["aws_access_key_id"],
aws_secret_access_key=aoss_credentials["aws_secret_access_key"],
),
hosts=[aoss_credentials["aoss_host"]],
use_ssl=True,
verify_certs=True,
)

stager = OpenSearchUploadStager(
upload_stager_config=OpenSearchUploadStagerConfig(
index_name="opensearch_serverless_e2e_destination"
)
)

uploader = OpenSearchUploader(
connection_config=connection_config,
upload_config=OpenSearchUploaderConfig(index_name="opensearch_serverless_e2e_destination"),
)

staged_filepath = stager.run(
elements_filepath=upload_file,
file_data=file_data,
output_dir=tmp_path,
output_filename=upload_file.name,
)

import concurrent.futures

def threaded_precheck():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(uploader.precheck)
future.result()

threaded_precheck()

await uploader.run_async(path=staged_filepath, file_data=file_data)


@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, NOSQL_TAG, "aws", "aoss")
def test_opensearch_aoss_precheck_validates_connection(aoss_credentials: dict):
"""Test that indexer precheck succeeds against a live AOSS collection."""
indexer_config = OpenSearchIndexerConfig(index_name="opensearch_serverless_e2e_source")

connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(
aws_access_key_id=aoss_credentials["aws_access_key_id"],
aws_secret_access_key=aoss_credentials["aws_secret_access_key"],
),
hosts=[aoss_credentials["aoss_host"]],
use_ssl=True,
verify_certs=True,
)

indexer = OpenSearchIndexer(connection_config=connection_config, index_config=indexer_config)
indexer.precheck()


@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG, NOSQL_TAG, "aws", "aoss")
def test_opensearch_aoss_precheck_fail_invalid_credentials():
"""Test that precheck fails with invalid IAM credentials against AOSS."""
aoss_host = os.getenv("OPENSEARCH_AOSS_HOST")
if not aoss_host:
pytest.skip("OPENSEARCH_AOSS_HOST not set")

indexer_config = OpenSearchIndexerConfig(index_name="opensearch_serverless_e2e_source")

connection_config = OpenSearchConnectionConfig(
access_config=OpenSearchAccessConfig(
aws_access_key_id="INVALID_KEY",
aws_secret_access_key="INVALID_SECRET",
),
hosts=[aoss_host],
use_ssl=True,
verify_certs=True,
)

indexer = OpenSearchIndexer(connection_config=connection_config, index_config=indexer_config)

with pytest.raises(SourceConnectionError):
indexer.precheck()
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.4.4" # pragma: no cover
__version__ = "1.4.5" # pragma: no cover
Loading