diff --git a/airbyte/sources/faker_utils.py b/airbyte/sources/faker_utils.py new file mode 100644 index 00000000..649547a1 --- /dev/null +++ b/airbyte/sources/faker_utils.py @@ -0,0 +1,96 @@ +"""Utility functions for Source Faker configuration validation and error handling.""" + +from __future__ import annotations + +from typing import Any + + +def validate_faker_config(config: dict[str, Any]) -> dict[str, Any]: + """ + Validate and normalize Source Faker configuration parameters. + + Args: + config: Raw configuration dictionary for source-faker + + Returns: + Validated and normalized configuration dictionary + + Raises: + ValueError: If configuration contains invalid parameters + """ + validated_config = config.copy() + + if "count" in validated_config: + count = validated_config["count"] + if not isinstance(count, int) or count <= 0: + raise ValueError( + f"Source Faker 'count' parameter must be a positive integer, got: {count}" + ) + if count > 1000000: + raise ValueError( + f"Source Faker 'count' parameter is too large ({count}). " + "Consider using a smaller value for better performance." + ) + + if "seed" in validated_config: + seed = validated_config["seed"] + if not isinstance(seed, int) or seed < 0: + raise ValueError( + f"Source Faker 'seed' parameter must be a non-negative integer, got: {seed}" + ) + + if "parallelism" in validated_config: + parallelism = validated_config["parallelism"] + if not isinstance(parallelism, int) or parallelism <= 0: + raise ValueError( + f"Source Faker 'parallelism' parameter must be a positive integer, got: {parallelism}" + ) + if parallelism > 32: + raise ValueError( + f"Source Faker 'parallelism' parameter is too high ({parallelism}). " + "Consider using a value between 1-32 for optimal performance." + ) + + if "always_updated" in validated_config: + always_updated = validated_config["always_updated"] + if not isinstance(always_updated, bool): + raise ValueError( + f"Source Faker 'always_updated' parameter must be a boolean, got: {always_updated}" + ) + + return validated_config + + +def get_faker_config_recommendations(config: dict[str, Any]) -> list[str]: + """ + Get configuration recommendations for Source Faker based on the provided config. + + Args: + config: Source Faker configuration dictionary + + Returns: + List of recommendation strings + """ + recommendations = [] + + count = config.get("count", 100) + parallelism = config.get("parallelism", 4) + + if count > 10000 and parallelism < 8: + recommendations.append( + f"For large datasets (count={count}), consider increasing parallelism " + f"from {parallelism} to 8-16 for better performance." + ) + + if count < 100 and parallelism > 4: + recommendations.append( + f"For small datasets (count={count}), parallelism={parallelism} may be " + "unnecessary. Consider reducing to 1-4 for simpler execution." + ) + + if "seed" not in config: + recommendations.append( + "Consider adding a 'seed' parameter for reproducible test data generation." + ) + + return recommendations diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index f2c52c11..51afb0cb 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -10,6 +10,7 @@ from airbyte._executors.util import get_connector_executor from airbyte.exceptions import PyAirbyteInputError from airbyte.sources.base import Source +from airbyte.sources.faker_utils import validate_faker_config, get_faker_config_recommendations if TYPE_CHECKING: @@ -112,6 +113,23 @@ def get_source( # noqa: PLR0913 # Too many arguments install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used. """ + if name == "source-faker" and config: + try: + config = validate_faker_config(config) + recommendations = get_faker_config_recommendations(config) + if recommendations: + warnings.warn( + f"Source Faker configuration recommendations:\n" + + "\n".join(f" • {rec}" for rec in recommendations), + UserWarning, + stacklevel=2 + ) + except ValueError as e: + raise PyAirbyteInputError( + message=f"Invalid Source Faker configuration: {e}", + guidance="Please check your Source Faker configuration parameters and try again.", + ) from e + return Source( name=name, config=config,