Skip to content

Commit 280e172

Browse files
authored
fix: remove 'cdk:low-code' logic with false-positives for manifest-only connector detection (#637)
1 parent 753c2ac commit 280e172

File tree

12 files changed

+418
-144
lines changed

12 files changed

+418
-144
lines changed

.github/workflows/python_pytest.yml

+21-10
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,6 @@ jobs:
4848
--durations=5 --exitfirst
4949
-m "not slow and not requires_creds and not linting and not flaky"
5050
51-
- name: Run Pytest with Coverage (Flaky Tests Only)
52-
timeout-minutes: 60
53-
continue-on-error: true
54-
env:
55-
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
56-
run: >
57-
poetry run coverage run -m pytest
58-
--durations=5 --exitfirst
59-
-m "flaky and not slow and not requires_creds"
60-
6151
- name: Print Coverage Report
6252
if: always()
6353
run: poetry run coverage report
@@ -75,6 +65,13 @@ jobs:
7565
name: fasttest-coverage
7666
path: htmlcov/
7767

68+
- name: Upload logs to GitHub Artifacts
69+
if: failure()
70+
uses: actions/upload-artifact@v4
71+
with:
72+
name: pytest-fast-test-logs
73+
path: /tmp/airbyte/logs/
74+
7875
pytest-no-creds:
7976
name: Pytest (No Creds)
8077
runs-on: ubuntu-latest
@@ -122,6 +119,13 @@ jobs:
122119
name: nocreds-test-coverage
123120
path: htmlcov/
124121

122+
- name: Upload logs to GitHub Artifacts
123+
if: failure()
124+
uses: actions/upload-artifact@v4
125+
with:
126+
name: pytest-no-creds-test-logs
127+
path: /tmp/airbyte/logs/
128+
125129
pytest:
126130
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
127131
# Don't run on forks. Run on pushes to main, and on PRs that are not from forks.
@@ -189,6 +193,13 @@ jobs:
189193
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-coverage
190194
path: htmlcov/
191195

196+
- name: Upload logs to GitHub Artifacts
197+
if: failure()
198+
uses: actions/upload-artifact@v4
199+
with:
200+
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-logs
201+
path: /tmp/airbyte/logs/
202+
192203
dependency-analysis:
193204
name: Dependency Analysis with Deptry
194205
runs-on: ubuntu-latest

airbyte/_executors/base.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ def _cli(self) -> list[str]:
185185
"""
186186
...
187187

188+
def map_cli_args(self, args: list[str]) -> list[str]:
189+
"""Map CLI args if needed.
190+
191+
By default, this is a no-op. Subclasses may override this method in order to
192+
map CLI args into the format expected by the connector.
193+
"""
194+
return args
195+
188196
def execute(
189197
self,
190198
args: list[str],
@@ -195,8 +203,9 @@ def execute(
195203
196204
If stdin is provided, it will be passed to the subprocess as STDIN.
197205
"""
206+
mapped_args = self.map_cli_args(args)
198207
with _stream_from_subprocess(
199-
[*self._cli, *args],
208+
[*self._cli, *mapped_args],
200209
stdin=stdin,
201210
) as stream_lines:
202211
yield from stream_lines

airbyte/_executors/declarative.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919

2020
if TYPE_CHECKING:
21+
from argparse import Namespace
2122
from collections.abc import Iterator
2223

2324
from airbyte._message_iterators import AirbyteMessageIterator
@@ -102,7 +103,9 @@ def execute(
102103
"""Execute the declarative source."""
103104
_ = stdin # Not used
104105
source_entrypoint = AirbyteEntrypoint(self.declarative_source)
105-
parsed_args = source_entrypoint.parse_args(args)
106+
107+
mapped_args: list[str] = self.map_cli_args(args)
108+
parsed_args: Namespace = source_entrypoint.parse_args(mapped_args)
106109
yield from source_entrypoint.run(parsed_args)
107110

108111
def ensure_installation(self, *, auto_fix: bool = True) -> None:

airbyte/_executors/docker.py

+48
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,33 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4+
import logging
45
import shutil
6+
from pathlib import Path
57
from typing import NoReturn
68

79
from airbyte import exceptions as exc
810
from airbyte._executors.base import Executor
911

1012

13+
logger = logging.getLogger("airbyte")
14+
15+
16+
DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR = "/airbyte/tmp"
17+
"""Default temp dir in an Airbyte connector's Docker image."""
18+
19+
1120
class DockerExecutor(Executor):
1221
def __init__(
1322
self,
1423
name: str | None = None,
1524
*,
1625
executable: list[str],
1726
target_version: str | None = None,
27+
volumes: dict[Path, str] | None = None,
1828
) -> None:
1929
self.executable: list[str] = executable
30+
self.volumes: dict[Path, str] = volumes or {}
2031
name = name or executable[0]
2132
super().__init__(name=name, target_version=target_version)
2233

@@ -56,3 +67,40 @@ def uninstall(self) -> NoReturn:
5667
def _cli(self) -> list[str]:
5768
"""Get the base args of the CLI executable."""
5869
return self.executable
70+
71+
def map_cli_args(self, args: list[str]) -> list[str]:
72+
"""Map local file paths to the container's volume paths."""
73+
new_args = []
74+
for arg in args:
75+
if Path(arg).exists():
76+
# This is a file path and we need to map it to the same file within the
77+
# relative path of the file within the container's volume.
78+
for local_volume, container_path in self.volumes.items():
79+
if Path(arg).is_relative_to(local_volume):
80+
logger.debug(
81+
f"Found file input path `{arg}` "
82+
f"relative to container-mapped volume: {local_volume}"
83+
)
84+
mapped_path = Path(container_path) / Path(arg).relative_to(local_volume)
85+
logger.debug(f"Mapping `{arg}` -> `{mapped_path}`")
86+
new_args.append(str(mapped_path))
87+
break
88+
else:
89+
# No break reached; a volume was found for this file path
90+
logger.warning(
91+
f"File path `{arg}` is not relative to any volume path "
92+
f"in the provided volume mappings: {self.volumes}. "
93+
"The file may not be available to the container at runtime."
94+
)
95+
new_args.append(arg)
96+
97+
else:
98+
new_args.append(arg)
99+
100+
if args != new_args:
101+
logger.debug(
102+
f"Mapping local-to-container CLI args: {args} -> {new_args} "
103+
f"based upon volume definitions: {self.volumes}"
104+
)
105+
106+
return new_args

airbyte/_executors/util.py

+23-10
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from airbyte import exceptions as exc
1313
from airbyte._executors.declarative import DeclarativeExecutor
14-
from airbyte._executors.docker import DockerExecutor
14+
from airbyte._executors.docker import DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR, DockerExecutor
1515
from airbyte._executors.local import PathExecutor
1616
from airbyte._executors.python import VenvExecutor
1717
from airbyte._util.meta import which
@@ -42,8 +42,8 @@ def _try_get_source_manifest(
4242
4343
Raises:
4444
- `PyAirbyteInputError`: If `source_name` is `None`.
45-
- `HTTPError`: If fetching the URL was unsuccessful.
46-
- `YAMLError`: If parsing the YAML failed.
45+
- `AirbyteConnectorInstallationError`: If the registry file cannot be downloaded or if the
46+
manifest YAML cannot be parsed.
4747
"""
4848
if source_name is None:
4949
raise exc.PyAirbyteInputError(
@@ -62,7 +62,16 @@ def _try_get_source_manifest(
6262
url=manifest_url,
6363
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
6464
)
65-
response.raise_for_status() # Raise HTTPError exception if the download failed
65+
try:
66+
response.raise_for_status() # Raise HTTPError exception if the download failed
67+
except requests.exceptions.HTTPError as ex:
68+
raise exc.AirbyteConnectorInstallationError(
69+
message="Failed to download the connector manifest.",
70+
context={
71+
"manifest_url": manifest_url,
72+
},
73+
) from ex
74+
6675
try:
6776
return cast("dict", yaml.safe_load(response.text))
6877
except yaml.YAMLError as ex:
@@ -115,7 +124,7 @@ def _get_local_executor(
115124
)
116125

117126

118-
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements
127+
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arguments/statements
119128
name: str,
120129
*,
121130
version: str | None = None,
@@ -217,21 +226,24 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
217226
if ":" not in docker_image:
218227
docker_image = f"{docker_image}:{version or 'latest'}"
219228

220-
temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
229+
host_temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
230+
container_temp_dir = DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR
221231

222232
local_mount_dir = Path().absolute() / name
223233
local_mount_dir.mkdir(exist_ok=True)
224234

235+
volumes = {
236+
local_mount_dir: "/local",
237+
host_temp_dir: container_temp_dir,
238+
}
225239
docker_cmd = [
226240
"docker",
227241
"run",
228242
"--rm",
229243
"-i",
230-
"--volume",
231-
f"{local_mount_dir}:/local/",
232-
"--volume",
233-
f"{temp_dir}:{temp_dir}",
234244
]
245+
for local_dir, container_dir in volumes.items():
246+
docker_cmd.extend(["--volume", f"{local_dir}:{container_dir}"])
235247

236248
if use_host_network is True:
237249
docker_cmd.extend(["--network", "host"])
@@ -241,6 +253,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
241253
return DockerExecutor(
242254
name=name,
243255
executable=docker_cmd,
256+
volumes=volumes,
244257
)
245258

246259
if source_manifest:

airbyte/_util/temp_files.py

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import json
7+
import stat
78
import tempfile
89
import time
910
import warnings
@@ -36,6 +37,9 @@ def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any,
3637
json.dumps(content) if isinstance(content, dict) else content,
3738
)
3839
temp_file.flush()
40+
# Grant "read" permission to all users
41+
Path(temp_file.name).chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
42+
3943
# Don't close the file yet (breaks Windows)
4044
# temp_file.close()
4145
temp_files.append(temp_file)

airbyte/constants.py

+15
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,18 @@ def _str_to_bool(value: str) -> bool:
109109
This setting helps you make informed choices about data privacy and operation in restricted and
110110
air-gapped environments.
111111
"""
112+
113+
AIRBYTE_PRINT_FULL_ERROR_LOGS: bool = _str_to_bool(
114+
os.getenv(
115+
key="AIRBYTE_PRINT_FULL_ERROR_LOGS",
116+
default=os.getenv("CI", "false"),
117+
)
118+
)
119+
"""Whether to print full error logs when an error occurs.
120+
This setting helps in debugging by providing detailed logs when errors occur. This is especially
121+
helpful in ephemeral environments like CI/CD pipelines where log files may not be persisted after
122+
the pipeline run.
123+
124+
If not set, the default value is `False` for non-CI environments.
125+
If running in a CI environment ("CI" env var is set), then the default value is `True`.
126+
"""

airbyte/exceptions.py

+26-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
from textwrap import indent
4545
from typing import TYPE_CHECKING, Any
4646

47+
from airbyte.constants import AIRBYTE_PRINT_FULL_ERROR_LOGS
48+
4749

4850
if TYPE_CHECKING:
4951
from airbyte._util.api_duck_types import AirbyteApiResponseDuckType
@@ -68,6 +70,7 @@ class PyAirbyteError(Exception):
6870
help_url: str | None = None
6971
log_text: str | list[str] | None = None
7072
log_file: Path | None = None
73+
print_full_log: bool = AIRBYTE_PRINT_FULL_ERROR_LOGS
7174
context: dict[str, Any] | None = None
7275
message: str | None = None
7376
original_exception: Exception | None = None
@@ -93,6 +96,7 @@ def __str__(self) -> str:
9396
"log_text",
9497
"context",
9598
"log_file",
99+
"print_full_log",
96100
"original_exception",
97101
]
98102
display_properties = {
@@ -119,9 +123,6 @@ def __str__(self) -> str:
119123
if context_str:
120124
exception_str += "\n " + context_str
121125

122-
if self.log_file:
123-
exception_str += f"\n Log file: {self.log_file.absolute()!s}"
124-
125126
if self.log_text:
126127
if isinstance(self.log_text, list):
127128
self.log_text = "\n".join(self.log_text)
@@ -131,6 +132,28 @@ def __str__(self) -> str:
131132
if self.original_exception:
132133
exception_str += VERTICAL_SEPARATOR + f"\nCaused by: {self.original_exception!s}"
133134

135+
if self.log_file:
136+
if self.print_full_log:
137+
if not self.log_file.is_file():
138+
exception_str += f"\n No log file found at: {self.log_file.absolute()!s}"
139+
140+
else:
141+
try:
142+
full_log_file_text = self.log_file.read_text()
143+
except Exception as ex:
144+
full_log_file_text = (
145+
f"[ERROR] Log file could not be read from: {self.log_file.absolute()!s}"
146+
f"\nRead error: {ex!s}"
147+
)
148+
149+
exception_str += (
150+
f"\n Full log file text from {self.log_file.absolute()!s}:"
151+
+ VERTICAL_SEPARATOR
152+
+ full_log_file_text
153+
+ VERTICAL_SEPARATOR
154+
)
155+
else:
156+
exception_str += f"\n Log file: {self.log_file.absolute()!s}"
134157
return exception_str
135158

136159
def __repr__(self) -> str:

0 commit comments

Comments
 (0)