Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions airbyte/sources/faker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Utility functions for Source Faker configuration validation and error handling."""

from __future__ import annotations

from typing import Any


def validate_faker_config(config: dict[str, Any]) -> dict[str, Any]:
"""
Validate and normalize Source Faker configuration parameters.

Args:
config: Raw configuration dictionary for source-faker

Returns:
Validated and normalized configuration dictionary

Raises:
ValueError: If configuration contains invalid parameters
"""
validated_config = config.copy()

if "count" in validated_config:
count = validated_config["count"]
if not isinstance(count, int) or count <= 0:
raise ValueError(
f"Source Faker 'count' parameter must be a positive integer, got: {count}"
)
if count > 1000000:
raise ValueError(
f"Source Faker 'count' parameter is too large ({count}). "
"Consider using a smaller value for better performance."
)

if "seed" in validated_config:
seed = validated_config["seed"]
if not isinstance(seed, int) or seed < 0:
raise ValueError(
f"Source Faker 'seed' parameter must be a non-negative integer, got: {seed}"
)

if "parallelism" in validated_config:
parallelism = validated_config["parallelism"]
if not isinstance(parallelism, int) or parallelism <= 0:
raise ValueError(
f"Source Faker 'parallelism' parameter must be a positive integer, got: {parallelism}"
)
if parallelism > 32:
raise ValueError(
f"Source Faker 'parallelism' parameter is too high ({parallelism}). "
"Consider using a value between 1-32 for optimal performance."
)

if "always_updated" in validated_config:
always_updated = validated_config["always_updated"]
if not isinstance(always_updated, bool):
raise ValueError(
f"Source Faker 'always_updated' parameter must be a boolean, got: {always_updated}"
)

return validated_config


def get_faker_config_recommendations(config: dict[str, Any]) -> list[str]:
"""
Get configuration recommendations for Source Faker based on the provided config.

Args:
config: Source Faker configuration dictionary

Returns:
List of recommendation strings
"""
recommendations = []

count = config.get("count", 100)
parallelism = config.get("parallelism", 4)

if count > 10000 and parallelism < 8:
recommendations.append(
f"For large datasets (count={count}), consider increasing parallelism "
f"from {parallelism} to 8-16 for better performance."
)

if count < 100 and parallelism > 4:
recommendations.append(
f"For small datasets (count={count}), parallelism={parallelism} may be "
"unnecessary. Consider reducing to 1-4 for simpler execution."
)

if "seed" not in config:
recommendations.append(
"Consider adding a 'seed' parameter for reproducible test data generation."
)

return recommendations
18 changes: 18 additions & 0 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airbyte._executors.util import get_connector_executor
from airbyte.exceptions import PyAirbyteInputError
from airbyte.sources.base import Source
from airbyte.sources.faker_utils import validate_faker_config, get_faker_config_recommendations


if TYPE_CHECKING:
Expand Down Expand Up @@ -112,6 +113,23 @@ def get_source( # noqa: PLR0913 # Too many arguments
install_root: (Optional.) The root directory where the virtual environment will be
created. If not provided, the current working directory will be used.
"""
if name == "source-faker" and config:
try:
config = validate_faker_config(config)
recommendations = get_faker_config_recommendations(config)
if recommendations:
warnings.warn(
f"Source Faker configuration recommendations:\n" +
"\n".join(f" • {rec}" for rec in recommendations),
UserWarning,
stacklevel=2
)
except ValueError as e:
raise PyAirbyteInputError(
message=f"Invalid Source Faker configuration: {e}",
guidance="Please check your Source Faker configuration parameters and try again.",
) from e

return Source(
name=name,
config=config,
Expand Down
Loading