Skip to content

chore(deps): remove constraint on request lib #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b7aea0c
feat: remove 'cdk:low-code' logic with false-positives for manifest-o…
aaronsteers Mar 25, 2025
59e5220
chore: bump airbyte-cdk version in lock file
aaronsteers Mar 25, 2025
14706b1
chore: update hardcoded failures list
aaronsteers Mar 25, 2025
78980e1
fix: raise `AirbyteConnectorInstallationError` when manifest.yml not …
aaronsteers Mar 25, 2025
1fe27d1
improve test logic
aaronsteers Mar 25, 2025
f243d66
Update airbyte/sources/registry.py
aaronsteers Mar 25, 2025
a97776c
feat: add auto-print of log file in CI
aaronsteers Mar 25, 2025
1681a27
skip broken test
aaronsteers Mar 25, 2025
ca5e0ed
upload logs dir
aaronsteers Mar 25, 2025
f89c843
Update airbyte/exceptions.py
aaronsteers Mar 25, 2025
226e8d4
fix logic
aaronsteers Mar 25, 2025
26b5967
add volume-mapped CLI arg translation for docker container executors
aaronsteers Mar 25, 2025
9b8c525
temp debug change: print directly
aaronsteers Mar 25, 2025
0f8ed4b
try: bare /airbyte dir
aaronsteers Mar 25, 2025
c52e53d
remove extra log output
aaronsteers Mar 25, 2025
b113026
try var directory
aaronsteers Mar 25, 2025
95e63fa
grant read to all users
aaronsteers Mar 25, 2025
a1fde04
don't run flaky tests in pytest-fast
aaronsteers Mar 25, 2025
612d280
clean up logging
aaronsteers Mar 25, 2025
6470537
Update airbyte/_executors/docker.py
aaronsteers Mar 25, 2025
be34e3e
chore(deps): release max requests lib constraint
aaronsteers Mar 25, 2025
0d65819
chore: poetry lock
aaronsteers Mar 25, 2025
4ffce1c
Merge branch 'main' into aj/chore-deps/use-latest-requests-lib
aaronsteers Mar 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ jobs:
--durations=5 --exitfirst
-m "not slow and not requires_creds and not linting and not flaky"

- name: Run Pytest with Coverage (Flaky Tests Only)
timeout-minutes: 60
continue-on-error: true
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: >
poetry run coverage run -m pytest
--durations=5 --exitfirst
-m "flaky and not slow and not requires_creds"

- name: Print Coverage Report
if: always()
run: poetry run coverage report
Expand All @@ -75,6 +65,13 @@ jobs:
name: fasttest-coverage
path: htmlcov/

- name: Upload logs to GitHub Artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: pytest-fast-test-logs
path: /tmp/airbyte/logs/

pytest-no-creds:
name: Pytest (No Creds)
runs-on: ubuntu-latest
Expand Down Expand Up @@ -122,6 +119,13 @@ jobs:
name: nocreds-test-coverage
path: htmlcov/

- name: Upload logs to GitHub Artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: pytest-no-creds-test-logs
path: /tmp/airbyte/logs/

pytest:
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
# Don't run on forks. Run on pushes to main, and on PRs that are not from forks.
Expand Down Expand Up @@ -189,6 +193,13 @@ jobs:
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-coverage
path: htmlcov/

- name: Upload logs to GitHub Artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-logs
path: /tmp/airbyte/logs/

dependency-analysis:
name: Dependency Analysis with Deptry
runs-on: ubuntu-latest
Expand Down
11 changes: 10 additions & 1 deletion airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ def _cli(self) -> list[str]:
"""
...

def map_cli_args(self, args: list[str]) -> list[str]:
"""Map CLI args if needed.

By default, this is a no-op. Subclasses may override this method in order to
map CLI args into the format expected by the connector.
"""
return args

def execute(
self,
args: list[str],
Expand All @@ -195,8 +203,9 @@ def execute(

If stdin is provided, it will be passed to the subprocess as STDIN.
"""
mapped_args = self.map_cli_args(args)
with _stream_from_subprocess(
[*self._cli, *args],
[*self._cli, *mapped_args],
stdin=stdin,
) as stream_lines:
yield from stream_lines
Expand Down
5 changes: 4 additions & 1 deletion airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


if TYPE_CHECKING:
from argparse import Namespace
from collections.abc import Iterator

from airbyte._message_iterators import AirbyteMessageIterator
Expand Down Expand Up @@ -102,7 +103,9 @@ def execute(
"""Execute the declarative source."""
_ = stdin # Not used
source_entrypoint = AirbyteEntrypoint(self.declarative_source)
parsed_args = source_entrypoint.parse_args(args)

mapped_args: list[str] = self.map_cli_args(args)
parsed_args: Namespace = source_entrypoint.parse_args(mapped_args)
yield from source_entrypoint.run(parsed_args)

def ensure_installation(self, *, auto_fix: bool = True) -> None:
Expand Down
48 changes: 48 additions & 0 deletions airbyte/_executors/docker.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import logging
import shutil
from pathlib import Path
from typing import NoReturn

from airbyte import exceptions as exc
from airbyte._executors.base import Executor


logger = logging.getLogger("airbyte")


DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR = "/airbyte/tmp"
"""Default temp dir in an Airbyte connector's Docker image."""


class DockerExecutor(Executor):
def __init__(
self,
name: str | None = None,
*,
executable: list[str],
target_version: str | None = None,
volumes: dict[Path, str] | None = None,
) -> None:
self.executable: list[str] = executable
self.volumes: dict[Path, str] = volumes or {}
name = name or executable[0]
super().__init__(name=name, target_version=target_version)

Expand Down Expand Up @@ -56,3 +67,40 @@ def uninstall(self) -> NoReturn:
def _cli(self) -> list[str]:
"""Get the base args of the CLI executable."""
return self.executable

def map_cli_args(self, args: list[str]) -> list[str]:
"""Map local file paths to the container's volume paths."""
new_args = []
for arg in args:
if Path(arg).exists():
# This is a file path and we need to map it to the same file within the
# relative path of the file within the container's volume.
for local_volume, container_path in self.volumes.items():
if Path(arg).is_relative_to(local_volume):
logger.debug(
f"Found file input path `{arg}` "
f"relative to container-mapped volume: {local_volume}"
)
mapped_path = Path(container_path) / Path(arg).relative_to(local_volume)
logger.debug(f"Mapping `{arg}` -> `{mapped_path}`")
new_args.append(str(mapped_path))
break
else:
# No break reached; a volume was found for this file path
logger.warning(
f"File path `{arg}` is not relative to any volume path "
f"in the provided volume mappings: {self.volumes}. "
"The file may not be available to the container at runtime."
)
new_args.append(arg)

else:
new_args.append(arg)

if args != new_args:
logger.debug(
f"Mapping local-to-container CLI args: {args} -> {new_args} "
f"based upon volume definitions: {self.volumes}"
)

return new_args
33 changes: 23 additions & 10 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from airbyte import exceptions as exc
from airbyte._executors.declarative import DeclarativeExecutor
from airbyte._executors.docker import DockerExecutor
from airbyte._executors.docker import DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR, DockerExecutor
from airbyte._executors.local import PathExecutor
from airbyte._executors.python import VenvExecutor
from airbyte._util.meta import which
Expand Down Expand Up @@ -42,8 +42,8 @@ def _try_get_source_manifest(

Raises:
- `PyAirbyteInputError`: If `source_name` is `None`.
- `HTTPError`: If fetching the URL was unsuccessful.
- `YAMLError`: If parsing the YAML failed.
- `AirbyteConnectorInstallationError`: If the registry file cannot be downloaded or if the
manifest YAML cannot be parsed.
"""
if source_name is None:
raise exc.PyAirbyteInputError(
Expand All @@ -62,7 +62,16 @@ def _try_get_source_manifest(
url=manifest_url,
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
response.raise_for_status() # Raise HTTPError exception if the download failed
except requests.exceptions.HTTPError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to download the connector manifest.",
context={
"manifest_url": manifest_url,
},
) from ex

try:
return cast("dict", yaml.safe_load(response.text))
except yaml.YAMLError as ex:
Expand Down Expand Up @@ -115,7 +124,7 @@ def _get_local_executor(
)


def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arguments/statements
name: str,
*,
version: str | None = None,
Expand Down Expand Up @@ -217,21 +226,24 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
if ":" not in docker_image:
docker_image = f"{docker_image}:{version or 'latest'}"

temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
host_temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
container_temp_dir = DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR

local_mount_dir = Path().absolute() / name
local_mount_dir.mkdir(exist_ok=True)

volumes = {
local_mount_dir: "/local",
host_temp_dir: container_temp_dir,
}
docker_cmd = [
"docker",
"run",
"--rm",
"-i",
"--volume",
f"{local_mount_dir}:/local/",
"--volume",
f"{temp_dir}:{temp_dir}",
]
for local_dir, container_dir in volumes.items():
docker_cmd.extend(["--volume", f"{local_dir}:{container_dir}"])

if use_host_network is True:
docker_cmd.extend(["--network", "host"])
Expand All @@ -241,6 +253,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
return DockerExecutor(
name=name,
executable=docker_cmd,
volumes=volumes,
)

if source_manifest:
Expand Down
4 changes: 4 additions & 0 deletions airbyte/_util/temp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import json
import stat
import tempfile
import time
import warnings
Expand Down Expand Up @@ -36,6 +37,9 @@ def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any,
json.dumps(content) if isinstance(content, dict) else content,
)
temp_file.flush()
# Grant "read" permission to all users
Path(temp_file.name).chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)

# Don't close the file yet (breaks Windows)
# temp_file.close()
temp_files.append(temp_file)
Expand Down
15 changes: 15 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,18 @@ def _str_to_bool(value: str) -> bool:
This setting helps you make informed choices about data privacy and operation in restricted and
air-gapped environments.
"""

AIRBYTE_PRINT_FULL_ERROR_LOGS: bool = _str_to_bool(
os.getenv(
key="AIRBYTE_PRINT_FULL_ERROR_LOGS",
default=os.getenv("CI", "false"),
)
)
"""Whether to print full error logs when an error occurs.
This setting helps in debugging by providing detailed logs when errors occur. This is especially
helpful in ephemeral environments like CI/CD pipelines where log files may not be persisted after
the pipeline run.
If not set, the default value is `False` for non-CI environments.
If running in a CI environment ("CI" env var is set), then the default value is `True`.
"""
29 changes: 26 additions & 3 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from textwrap import indent
from typing import TYPE_CHECKING, Any

from airbyte.constants import AIRBYTE_PRINT_FULL_ERROR_LOGS


if TYPE_CHECKING:
from airbyte._util.api_duck_types import AirbyteApiResponseDuckType
Expand All @@ -68,6 +70,7 @@ class PyAirbyteError(Exception):
help_url: str | None = None
log_text: str | list[str] | None = None
log_file: Path | None = None
print_full_log: bool = AIRBYTE_PRINT_FULL_ERROR_LOGS
context: dict[str, Any] | None = None
message: str | None = None
original_exception: Exception | None = None
Expand All @@ -93,6 +96,7 @@ def __str__(self) -> str:
"log_text",
"context",
"log_file",
"print_full_log",
"original_exception",
]
display_properties = {
Expand All @@ -119,9 +123,6 @@ def __str__(self) -> str:
if context_str:
exception_str += "\n " + context_str

if self.log_file:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"

if self.log_text:
if isinstance(self.log_text, list):
self.log_text = "\n".join(self.log_text)
Expand All @@ -131,6 +132,28 @@ def __str__(self) -> str:
if self.original_exception:
exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}"

if self.log_file:
if self.print_full_log:
if not self.log_file.is_file():
exception_str += f"\n No log file found at: {self.log_file.absolute()!s}"

else:
try:
full_log_file_text = self.log_file.read_text()
except Exception as ex:
full_log_file_text = (
f"[ERROR] Log file could not be read from: {self.log_file.absolute()!s}"
f"\nRead error: {ex!s}"
)

exception_str += (
f"\n Full log file text from {self.log_file.absolute()!s}:"
+ VERTICAL_SEPARATOR
+ full_log_file_text
+ VERTICAL_SEPARATOR
)
else:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"
return exception_str

def __repr__(self) -> str:
Expand Down
Loading