Skip to content

Commit 8e0cbba

Browse files
committed
refactor http_cache as property of executor
1 parent 82c4c83 commit 8e0cbba

File tree

8 files changed

+122
-94
lines changed

8 files changed

+122
-94
lines changed

airbyte/_connector_base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ def _execute(
433433
)
434434

435435
try:
436-
for line in self.executor.execute(args, stdin=stdin, env=env):
436+
for line in self.executor.execute(args, stdin=stdin):
437437
try:
438438
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
439439
if progress_tracker and message.record:
@@ -464,7 +464,7 @@ def _execute(
464464
connector_name=self.name,
465465
log_text=self._last_log_messages,
466466
original_exception=e,
467-
) from None
467+
) from e
468468

469469

470470
__all__ = [

airbyte/_executors/base.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4+
import os
45
import subprocess
56
from abc import ABC, abstractmethod
67
from contextlib import contextmanager
@@ -14,6 +15,7 @@
1415
if TYPE_CHECKING:
1516
from collections.abc import Generator, Iterable, Iterator
1617

18+
from airbyte.http_caching.cache import AirbyteConnectorCache
1719
from airbyte.sources.registry import ConnectorMetadata
1820

1921

@@ -157,6 +159,7 @@ def __init__(
157159
name: str | None = None,
158160
metadata: ConnectorMetadata | None = None,
159161
target_version: str | None = None,
162+
http_cache: AirbyteConnectorCache | None = None,
160163
) -> None:
161164
"""Initialize a connector executor.
162165
@@ -170,6 +173,7 @@ def __init__(
170173
) # metadata is not None here
171174
self.metadata: ConnectorMetadata | None = metadata
172175
self.enforce_version: bool = target_version is not None
176+
self.http_cache: AirbyteConnectorCache | None = http_cache
173177

174178
self.reported_version: str | None = None
175179
self.target_version: str | None = None
@@ -201,7 +205,6 @@ def execute(
201205
args: list[str],
202206
*,
203207
stdin: IO[str] | AirbyteMessageIterator | None = None,
204-
env: dict[str, str] | None = None,
205208
) -> Iterator[str]:
206209
"""Execute a command and return an iterator of STDOUT lines.
207210
@@ -211,10 +214,23 @@ def execute(
211214
with _stream_from_subprocess(
212215
[*self._cli, *mapped_args],
213216
stdin=stdin,
214-
env=env,
217+
env=self.env_vars,
215218
) as stream_lines:
216219
yield from stream_lines
217220

221+
@property
222+
def env_vars(self) -> dict[str, str]:
223+
"""Get the environment variables for the connector.
224+
225+
By default, this is an empty dict. Subclasses may override this method
226+
to provide custom environment variables.
227+
"""
228+
result = cast(dict[str,str], os.environ.copy())
229+
if self.http_cache:
230+
result.update(self.http_cache.get_env_vars())
231+
232+
return result
233+
218234
@abstractmethod
219235
def ensure_installation(self, *, auto_fix: bool = True) -> None:
220236
_ = auto_fix

airbyte/_executors/declarative.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from collections.abc import Iterator
2323

2424
from airbyte._message_iterators import AirbyteMessageIterator
25+
from airbyte.http_caching.cache import AirbyteConnectorCache
2526

2627

2728
def _suppress_cdk_pydantic_deprecation_warnings() -> None:
@@ -43,6 +44,8 @@ def __init__(
4344
self,
4445
name: str,
4546
manifest: dict | Path,
47+
*,
48+
http_cache: AirbyteConnectorCache | None = None,
4649
) -> None:
4750
"""Initialize a declarative executor.
4851
@@ -54,6 +57,8 @@ def __init__(
5457

5558
self.name = name
5659
self._manifest_dict: dict
60+
self.http_cache = http_cache
61+
5762
if isinstance(manifest, Path):
5863
self._manifest_dict = cast("dict", yaml.safe_load(manifest.read_text()))
5964

@@ -99,7 +104,6 @@ def execute(
99104
args: list[str],
100105
*,
101106
stdin: IO[str] | AirbyteMessageIterator | None = None,
102-
env: dict[str, str] | None = None,
103107
) -> Iterator[str]:
104108
"""Execute the declarative source."""
105109
_ = stdin, env # Not used

airbyte/_executors/docker.py

+46-10
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4+
import http
45
import logging
56
import shutil
7+
import tempfile
68
from pathlib import Path
79
from typing import NoReturn
810

911
from airbyte import exceptions as exc
1012
from airbyte._executors.base import Executor
13+
from airbyte.constants import TEMP_DIR_OVERRIDE
14+
from airbyte.http_caching.cache import AirbyteConnectorCache
1115

1216

1317
logger = logging.getLogger("airbyte")
@@ -20,16 +24,26 @@
2024
class DockerExecutor(Executor):
2125
def __init__(
2226
self,
23-
name: str | None = None,
27+
name: str,
28+
docker_image: str,
2429
*,
25-
executable: list[str],
26-
target_version: str | None = None,
27-
volumes: dict[Path, str] | None = None,
30+
use_host_network: bool = False,
31+
http_cache: AirbyteConnectorCache | None = None,
2832
) -> None:
29-
self.executable: list[str] = executable
30-
self.volumes: dict[Path, str] = volumes or {}
31-
name = name or executable[0]
32-
super().__init__(name=name, target_version=target_version)
33+
self.docker_image: str = docker_image
34+
self.use_host_network: bool = use_host_network
35+
36+
local_mount_dir = Path().absolute() / name
37+
local_mount_dir.mkdir(exist_ok=True)
38+
self.volumes = {
39+
local_mount_dir: "/local",
40+
(TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())): DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR,
41+
}
42+
43+
super().__init__(
44+
name=name or self.docker_image,
45+
http_cache=http_cache,
46+
)
3347

3448
def ensure_installation(
3549
self,
@@ -65,8 +79,30 @@ def uninstall(self) -> NoReturn:
6579

6680
@property
6781
def _cli(self) -> list[str]:
68-
"""Get the base args of the CLI executable."""
69-
return self.executable
82+
"""Get the executable CLI command.
83+
84+
For Docker executors, this is the `docker run` command with the necessary arguments
85+
to run the connector in a Docker container. This will be extended (suffixed) by the
86+
connector's CLI commands and arguments, such as 'spec' or 'check --config=...'.
87+
"""
88+
result: list[str] = [
89+
"docker",
90+
"run",
91+
"--rm",
92+
"-i",
93+
]
94+
for local_dir, container_dir in self.volumes.items():
95+
result.extend(["--volume", f"{local_dir}:{container_dir}"])
96+
97+
if self.http_cache:
98+
for key, value in self.http_cache.get_env_vars().items():
99+
result.extend(["-e", f"{key}={value}"])
100+
101+
if self.use_host_network:
102+
result.extend(["--network", "host"])
103+
104+
result.append(self.docker_image)
105+
return result
70106

71107
def map_cli_args(self, args: list[str]) -> list[str]:
72108
"""Map local file paths to the container's volume paths."""

airbyte/_executors/util.py

+11-30
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4-
import tempfile
54
from pathlib import Path
65
from typing import TYPE_CHECKING, Literal, cast
76

@@ -11,18 +10,19 @@
1110

1211
from airbyte import exceptions as exc
1312
from airbyte._executors.declarative import DeclarativeExecutor
14-
from airbyte._executors.docker import DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR, DockerExecutor
13+
from airbyte._executors.docker import DockerExecutor
1514
from airbyte._executors.local import PathExecutor
1615
from airbyte._executors.python import VenvExecutor
1716
from airbyte._util.meta import which
1817
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
19-
from airbyte.constants import AIRBYTE_OFFLINE_MODE, TEMP_DIR_OVERRIDE
18+
from airbyte.constants import AIRBYTE_OFFLINE_MODE
2019
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata
2120
from airbyte.version import get_version
2221

2322

2423
if TYPE_CHECKING:
2524
from airbyte._executors.base import Executor
25+
from airbyte.http_caching.cache import AirbyteConnectorCache
2626

2727

2828
VERSION_LATEST = "latest"
@@ -124,7 +124,7 @@ def _get_local_executor(
124124
)
125125

126126

127-
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arguments/statements
127+
def get_connector_executor( # noqa: PLR0912, PLR0913 # Too many branches/arguments
128128
name: str,
129129
*,
130130
version: str | None = None,
@@ -135,6 +135,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
135135
source_manifest: bool | dict | Path | str | None = None,
136136
install_if_missing: bool = True,
137137
install_root: Path | None = None,
138+
http_cache: AirbyteConnectorCache | None = None,
138139
) -> Executor:
139140
"""This factory function creates an executor for a connector.
140141
@@ -226,41 +227,19 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
226227
if ":" not in docker_image:
227228
docker_image = f"{docker_image}:{version or 'latest'}"
228229

229-
host_temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
230-
container_temp_dir = DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR
231-
232-
local_mount_dir = Path().absolute() / name
233-
local_mount_dir.mkdir(exist_ok=True)
234-
235-
volumes = {
236-
local_mount_dir: "/local",
237-
host_temp_dir: container_temp_dir,
238-
}
239-
docker_cmd = [
240-
"docker",
241-
"run",
242-
"--rm",
243-
"-i",
244-
]
245-
for local_dir, container_dir in volumes.items():
246-
docker_cmd.extend(["--volume", f"{local_dir}:{container_dir}"])
247-
248-
if use_host_network is True:
249-
docker_cmd.extend(["--network", "host"])
250-
251-
docker_cmd.extend([docker_image])
252-
253230
return DockerExecutor(
254231
name=name,
255-
executable=docker_cmd,
256-
volumes=volumes,
232+
docker_image=docker_image,
233+
use_host_network=use_host_network,
234+
http_cache=http_cache,
257235
)
258236

259237
if source_manifest:
260238
if isinstance(source_manifest, dict | Path):
261239
return DeclarativeExecutor(
262240
name=name,
263241
manifest=source_manifest,
242+
http_cache=http_cache,
264243
)
265244

266245
if isinstance(source_manifest, str | bool):
@@ -273,6 +252,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
273252
return DeclarativeExecutor(
274253
name=name,
275254
manifest=source_manifest,
255+
http_cache=http_cache,
276256
)
277257

278258
# else: we are installing a connector in a Python virtual environment:
@@ -284,6 +264,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
284264
target_version=version,
285265
pip_url=pip_url,
286266
install_root=install_root,
267+
http_cache=http_cache,
287268
)
288269
if install_if_missing:
289270
executor.ensure_installation()

airbyte/http_caching/cache.py

+15
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,21 @@ def __init__(
6868
self._proxy: DumpMaster | None = None
6969
self._addon: HttpCachingAddon | None = None
7070

71+
def get_env_vars(self) -> dict[str, str]:
72+
"""Get the environment variables to apply to processes using the HTTP proxy."""
73+
env_vars = {}
74+
if self._proxy_port:
75+
proxy_url = f"http://127.0.0.1:{self._proxy_port}"
76+
env_vars = {
77+
"HTTP_PROXY": proxy_url,
78+
"HTTPS_PROXY": proxy_url,
79+
"http_proxy": proxy_url,
80+
"https_proxy": proxy_url,
81+
"NO_PROXY": "localhost,127.0.0.1",
82+
"no_proxy": "localhost,127.0.0.1",
83+
}
84+
return env_vars
85+
7186
def start(self) -> int:
7287
"""Start the HTTP proxy.
7388

0 commit comments

Comments
 (0)