diff --git a/airbyte/_executors/declarative.py b/airbyte/_executors/declarative.py index 2538236c..c73e5e6d 100644 --- a/airbyte/_executors/declarative.py +++ b/airbyte/_executors/declarative.py @@ -4,6 +4,8 @@ from __future__ import annotations import hashlib +import os +import sys import warnings from pathlib import Path from typing import IO, TYPE_CHECKING, Any, cast @@ -15,6 +17,8 @@ from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte._executors.base import Executor +from airbyte._executors.python import _get_pypi_python_requirements_cached +from airbyte._util.semver import check_python_version_compatibility if TYPE_CHECKING: @@ -22,6 +26,7 @@ from collections.abc import Iterator from airbyte._message_iterators import AirbyteMessageIterator + from airbyte.sources.registry import ConnectorMetadata def _suppress_cdk_pydantic_deprecation_warnings() -> None: @@ -45,6 +50,7 @@ def __init__( manifest: dict | Path, components_py: str | Path | None = None, components_py_checksum: str | None = None, + metadata: ConnectorMetadata | None = None, ) -> None: """Initialize a declarative executor. @@ -57,6 +63,7 @@ def __init__( _suppress_cdk_pydantic_deprecation_warnings() self.name = name + self.metadata = metadata self._manifest_dict: dict if isinstance(manifest, Path): self._manifest_dict = cast("dict", yaml.safe_load(manifest.read_text())) @@ -115,13 +122,28 @@ def execute( yield from source_entrypoint.run(parsed_args) def ensure_installation(self, *, auto_fix: bool = True) -> None: - """No-op. The declarative source is included with PyAirbyte.""" + """Check version compatibility for declarative sources.""" _ = auto_fix - pass + self._check_version_compatibility() def install(self) -> None: - """No-op. The declarative source is included with PyAirbyte.""" - pass + """Check version compatibility for declarative sources.""" + self._check_version_compatibility() + + def _check_version_compatibility(self) -> None: + """Check Python version compatibility for declarative connectors.""" + if "pytest" in sys.modules or os.getenv("CI") == "true": + return + + if not self.metadata or not hasattr(self.metadata, "pypi_package_name"): + return + + package_name = self.metadata.pypi_package_name + if not package_name: + package_name = f"airbyte-{self.name}" + + requires_python = _get_pypi_python_requirements_cached(package_name) + check_python_version_compatibility(package_name, requires_python) def uninstall(self) -> None: """No-op. The declarative source is included with PyAirbyte.""" diff --git a/airbyte/_executors/python.py b/airbyte/_executors/python.py index f3adcaac..88f8ee8e 100644 --- a/airbyte/_executors/python.py +++ b/airbyte/_executors/python.py @@ -5,25 +5,66 @@ import subprocess import sys from contextlib import suppress +from functools import lru_cache from pathlib import Path from shutil import rmtree from typing import TYPE_CHECKING, Literal +import requests from overrides import overrides from rich import print # noqa: A004 # Allow shadowing the built-in from airbyte import exceptions as exc from airbyte._executors.base import Executor from airbyte._util.meta import is_windows +from airbyte._util.semver import check_python_version_compatibility from airbyte._util.telemetry import EventState, log_install_state from airbyte._util.venv_util import get_bin_dir -from airbyte.constants import NO_UV +from airbyte.constants import AIRBYTE_OFFLINE_MODE, NO_UV +from airbyte.version import get_version if TYPE_CHECKING: from airbyte.sources.registry import ConnectorMetadata +@lru_cache(maxsize=128) +def _get_pypi_python_requirements_cached(package_name: str) -> str | None: + """Get the requires_python field from PyPI for a package. + + Args: + package_name: The PyPI package name to check + + Returns: + The requires_python string from PyPI, or None if unavailable + + Example: + For airbyte-source-hubspot, returns "<3.12,>=3.10" + """ + if AIRBYTE_OFFLINE_MODE: + return None + + url = f"https://pypi.org/pypi/{package_name}/json" + version = get_version() + + try: + response = requests.get( + url=url, + headers={"User-Agent": f"PyAirbyte/{version}" if version else "PyAirbyte"}, + timeout=10, + ) + + if not response.ok: + return None + + data = response.json() + if not data: + return None + return data.get("info", {}).get("requires_python") + except Exception: + return None + + class VenvExecutor(Executor): def __init__( self, @@ -125,6 +166,18 @@ def install(self) -> None: input_value=str(self.use_python), ) + package_name = ( + self.metadata.pypi_package_name + if self.metadata and self.metadata.pypi_package_name + else f"airbyte-{self.name}" + ) + requires_python = _get_pypi_python_requirements_cached(package_name) + check_python_version_compatibility(package_name, requires_python) + + self._run_subprocess_and_raise_on_failure( + [sys.executable, "-m", "venv", str(self._get_venv_path())] + ) + python_override: str | None = None if not NO_UV and isinstance(self.use_python, Path): python_override = str(self.use_python.absolute()) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index e33a9c05..684a9f4e 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -318,11 +318,15 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # if not components_py_path.exists(): components_py_path = None - return DeclarativeExecutor( + executor = DeclarativeExecutor( name=name, manifest=source_manifest, components_py=components_py_path, + metadata=metadata, ) + if install_if_missing: + executor.ensure_installation() + return executor if isinstance(source_manifest, str | bool): # Source manifest is either a URL or a boolean (True) @@ -333,17 +337,21 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # ) ) - return DeclarativeExecutor( + executor = DeclarativeExecutor( name=name, manifest=manifest_dict, components_py=components_py, components_py_checksum=components_py_checksum, + metadata=metadata, ) + if install_if_missing: + executor.ensure_installation() + return executor # else: we are installing a connector in a Python virtual environment: try: - executor = VenvExecutor( + venv_executor = VenvExecutor( name=name, metadata=metadata, target_version=version, @@ -352,11 +360,11 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # use_python=use_python, ) if install_if_missing: - executor.ensure_installation() + venv_executor.ensure_installation() except Exception as e: log_install_state(name, state=EventState.FAILED, exception=e) raise else: # No exceptions were raised, so return the executor. - return executor + return venv_executor diff --git a/airbyte/_util/semver.py b/airbyte/_util/semver.py new file mode 100644 index 00000000..c36ef1ca --- /dev/null +++ b/airbyte/_util/semver.py @@ -0,0 +1,41 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""Semantic version utilities for PyAirbyte.""" + +import sys + +from packaging.specifiers import SpecifierSet +from packaging.version import Version + +from airbyte.logs import warn_once + + +def check_python_version_compatibility( + package_name: str, + requires_python: str | None, +) -> bool | None: + """Check if current Python version is compatible with package requirements. + + Returns True if confirmed, False if incompatible, or None if no determination can be made. + + Args: + package_name: Name of the package being checked + requires_python: The requires_python constraint from PyPI (e.g., "<3.12,>=3.10") + """ + if not requires_python: + return None + + current_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + + spec_set = SpecifierSet(requires_python) + current_ver = Version(current_version) + + if current_ver not in spec_set: + warn_once( + f"Python version compatibility warning for '{package_name}': " + f"Current Python {current_version} may not be compatible with " + f"package requirement '{requires_python}'. " + f"Installation will proceed but may fail.", + with_stack=False, + ) + return False + return True diff --git a/poetry.lock b/poetry.lock index 14b0f99d..e88c9e85 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5592,4 +5592,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "5985ce31397bcad1e50080763a6f10f340267d000da79c9560a866700c579892" +content-hash = "6bbf3d5f66687710b1ee7e3ab2e0477981337feb9f9b3f7a2e2902a15236d9ee" diff --git a/pyproject.toml b/pyproject.toml index 676f15a3..0961ade1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ google-cloud-secret-manager = "^2.17.0" jsonschema = ">=3.2.0,<5.0" orjson = "^3.10" overrides = "^7.4.0" +packaging = ">=23.0" pandas = { version = ">=1.5.3,<3.0" } psycopg = {extras = ["binary", "pool"], version = "^3.1.19"} psycopg2-binary = "^2.9.9" diff --git a/tests/unit_tests/test_python_executor.py b/tests/unit_tests/test_python_executor.py new file mode 100644 index 00000000..dfe1b314 --- /dev/null +++ b/tests/unit_tests/test_python_executor.py @@ -0,0 +1,178 @@ +from unittest.mock import Mock, patch +import requests + +from airbyte._executors.python import _get_pypi_python_requirements_cached +from airbyte._util.semver import check_python_version_compatibility + + +class TestGetPypiPythonRequirementsCached: + """Test the _get_pypi_python_requirements_cached function.""" + + def test_offline_mode_returns_none(self): + """Test that offline mode returns None without making network calls.""" + with patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", True): + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("airbyte-source-hubspot") + assert result is None + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_successful_api_response(self, mock_get): + """Test successful PyPI API response returns requires_python.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.return_value = {"info": {"requires_python": "<3.12,>=3.10"}} + mock_get.return_value = mock_response + + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("airbyte-source-hubspot") + + assert result == "<3.12,>=3.10" + mock_get.assert_called_once() + call_args = mock_get.call_args + assert ( + call_args[1]["url"] == "https://pypi.org/pypi/airbyte-source-hubspot/json" + ) + assert call_args[1]["timeout"] == 10 + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_package_not_found_returns_none(self, mock_get): + """Test that 404 response returns None.""" + mock_response = Mock() + mock_response.ok = False + mock_get.return_value = mock_response + + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("nonexistent-package") + + assert result is None + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_json_parsing_error_returns_none(self, mock_get): + """Test that JSON parsing errors return None.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.side_effect = ValueError("Invalid JSON") + mock_get.return_value = mock_response + + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("test-package") + + assert result is None + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_missing_requires_python_field_returns_none(self, mock_get): + """Test that missing requires_python field returns None.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.return_value = {"info": {"name": "test-package"}} + mock_get.return_value = mock_response + + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("test-package") + + assert result is None + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_network_timeout_returns_none(self, mock_get): + """Test that network timeouts return None.""" + mock_get.side_effect = requests.exceptions.Timeout() + + _get_pypi_python_requirements_cached.cache_clear() + result = _get_pypi_python_requirements_cached("test-package") + + assert result is None + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_caching_behavior(self, mock_get): + """Test that lru_cache works correctly.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.return_value = {"info": {"requires_python": ">=3.8"}} + mock_get.return_value = mock_response + + _get_pypi_python_requirements_cached.cache_clear() + + result1 = _get_pypi_python_requirements_cached("test-package") + result2 = _get_pypi_python_requirements_cached("test-package") + + assert result1 == ">=3.8" + assert result2 == ">=3.8" + mock_get.assert_called_once() + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_user_agent_header(self, mock_get): + """Test that proper User-Agent header is sent.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.return_value = {"info": {"requires_python": ">=3.8"}} + mock_get.return_value = mock_response + + with patch("airbyte._executors.python.get_version", return_value="1.0.0"): + _get_pypi_python_requirements_cached.cache_clear() + _get_pypi_python_requirements_cached("test-package") + + call_args = mock_get.call_args + assert "PyAirbyte/1.0.0" in call_args[1]["headers"]["User-Agent"] + + @patch("airbyte._executors.python.AIRBYTE_OFFLINE_MODE", False) + @patch("requests.get") + def test_user_agent_header_no_version(self, mock_get): + """Test User-Agent header when version is None.""" + mock_response = Mock() + mock_response.ok = True + mock_response.json.return_value = {"info": {"requires_python": ">=3.8"}} + mock_get.return_value = mock_response + + with patch("airbyte._executors.python.get_version", return_value=None): + _get_pypi_python_requirements_cached.cache_clear() + _get_pypi_python_requirements_cached("test-package") + + call_args = mock_get.call_args + assert call_args[1]["headers"]["User-Agent"] == "PyAirbyte" + + +class TestVenvExecutorVersionCompatibility: + """Test version compatibility checking in VenvExecutor.""" + + def test_check_python_version_compatibility_no_requirements(self): + """Test that None requirements return None.""" + result = check_python_version_compatibility("test-package", None) + assert result is None + + @patch("airbyte._util.semver.warn_once") + def test_check_python_version_compatibility_incompatible(self, mock_warn): + """Test warning for incompatible Python version.""" + mock_version_info = Mock() + mock_version_info.major = 3 + mock_version_info.minor = 13 + mock_version_info.micro = 0 + + with patch("sys.version_info", mock_version_info): + result = check_python_version_compatibility("test-package", "<3.12,>=3.10") + + assert result is False + mock_warn.assert_called_once() + warning_message = mock_warn.call_args[0][0] + assert "Python version compatibility warning" in warning_message + assert "test-package" in warning_message + assert "3.13.0" in warning_message + assert "<3.12,>=3.10" in warning_message + + def test_check_python_version_compatibility_compatible(self): + """Test compatible Python version returns True.""" + mock_version_info = Mock() + mock_version_info.major = 3 + mock_version_info.minor = 11 + mock_version_info.micro = 5 + + with patch("sys.version_info", mock_version_info): + result = check_python_version_compatibility("test-package", "<3.12,>=3.10") + + assert result is True