diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index ea7040eb..3f6a4239 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -108,9 +108,8 @@ def generator() -> Generator[AirbyteMessage, None, None]: "datetime.datetime", record.get(AB_EXTRACTED_AT_COLUMN) ).timestamp() ), - # `meta` and `namespace` are not handled: meta=None, - namespace=None, + # namespace=None, # Some destinations need this undeclared if null ), ) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index b1482bc0..8593dac2 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -265,7 +265,7 @@ def _write_airbyte_message_stream( with as_temp_files( files_contents=[ self._hydrated_config, - catalog_provider.configured_catalog.model_dump_json(), + catalog_provider.configured_catalog.model_dump_json(exclude_none=True), ] ) as [ config_file, diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index f2c52c11..84410367 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -135,6 +135,7 @@ def get_source( # noqa: PLR0913 # Too many arguments def get_benchmark_source( num_records: int | str = "5e5", *, + connector_name: str | None = None, install_if_missing: bool = True, ) -> Source: """Get a source for benchmarking. @@ -150,6 +151,10 @@ def get_benchmark_source( 500,000 records. Can be an integer (`1000`) or a string in scientific notation. For example, `"5e6"` will generate 5 million records. + connector_name: The connector to use for benchmarking. Defaults to "source-faker". + Valid options are "source-faker" and "source-e2e-test". + Note: source-e2e-test is slightly faster but has compatibility issues with + some destinations due to not generating final STATE or SUCCESS trace messages. install_if_missing: Whether to install the source if it is not available locally. Returns: @@ -165,17 +170,36 @@ def get_benchmark_source( input_value=str(num_records), ) from None + if connector_name is None: + connector_name = "source-faker" + + if connector_name not in {"source-faker", "source-e2e-test"}: + raise PyAirbyteInputError( + message="Invalid connector name for benchmarking.", + input_value=connector_name, + guidance="Valid options are 'source-faker' and 'source-e2e-test'.", + ) + + if connector_name == "source-faker": + return get_source( + name="source-faker", + config={ + "count": num_records, + "seed": 0, + }, + streams="*", + install_if_missing=install_if_missing, + ) + + # source-e2e-test return get_source( name="source-e2e-test", docker_image=True, - # docker_image="airbyte/source-e2e-test:latest", config={ - "type": "BENCHMARK", - "schema": "FIVE_STRING_COLUMNS", - "terminationCondition": { - "type": "MAX_RECORDS", - "max": num_records, - }, + "type": "INFINITE_FEED", + "max_records": num_records, + "seed": 0, + "message_interval": 1000, }, streams="*", install_if_missing=install_if_missing, diff --git a/examples/manual_connector_test.py b/examples/manual_connector_test.py new file mode 100644 index 00000000..2f4c4c87 --- /dev/null +++ b/examples/manual_connector_test.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +"""Helper script for manually testing source connectors with small record counts.""" + +import json +import subprocess +import tempfile +from pathlib import Path + +CONNECTOR_IMAGE = "airbyte/source-e2e-test:dev" +CONFIG = { + "type": "INFINITE_FEED", + "max_records": 5, + "seed": 0, + "message_interval": 1000, +} +CATALOG = { + "streams": [ + { + "stream": { + "name": "data", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] +} +OUTPUT_FILE = "/tmp/manual_source_output.jsonl" + + +def run_connector() -> None: + """Run the source connector and capture its output.""" + config_file = Path(tempfile.mkstemp(suffix=".json")[1]) + catalog_file = Path(tempfile.mkstemp(suffix=".json")[1]) + + config_file.write_text(json.dumps(CONFIG, indent=2)) + catalog_file.write_text(json.dumps(CATALOG, indent=2)) + + cmd = [ + "docker", + "run", + "--rm", + "-v", + f"{config_file}:/tmp/config.json", + "-v", + f"{catalog_file}:/tmp/catalog.json", + CONNECTOR_IMAGE, + "read", + "--config", + "/tmp/config.json", + "--catalog", + "/tmp/catalog.json", + ] + + with open(OUTPUT_FILE, "w") as f: + subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, timeout=60) + + config_file.unlink(missing_ok=True) + catalog_file.unlink(missing_ok=True) + + lines = Path(OUTPUT_FILE).read_text().strip().split("\n") + print(f"Generated {len(lines)} lines in {OUTPUT_FILE}") + for i, line in enumerate(lines[:3]): + print(f" {i + 1}: {line}") + + +if __name__ == "__main__": + print("Manual Source Connector Test") + run_connector() + print("Complete!")