Skip to content

feat: implement HTTP caching with mitmproxy's native format #646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
737c0c0
feat: implement HTTP caching for connectors
devin-ai-integration[bot] Mar 30, 2025
634f4de
fix: fix import ordering and type ignore issues
devin-ai-integration[bot] Mar 30, 2025
5fcfb92
chore: update poetry.lock
devin-ai-integration[bot] Mar 30, 2025
3a35bb1
fix: fix type annotation issues and unreachable code
devin-ai-integration[bot] Mar 30, 2025
61b4622
style: fix formatting and linting issues
devin-ai-integration[bot] Mar 30, 2025
e25a223
refactor: replace pickle with mitmproxy's native format for HTTP caching
devin-ai-integration[bot] Mar 30, 2025
e8696e4
fix: improve file path handling for Windows compatibility
devin-ai-integration[bot] Mar 30, 2025
b9fa560
docs: fix HAR format name in README
devin-ai-integration[bot] Mar 30, 2025
88a8895
chore: add `poe install` shortcut (side quest)
aaronsteers Mar 30, 2025
6755085
add cached example script
aaronsteers Mar 30, 2025
1e94da3
fix: resolve asyncio and serialization issues in HTTP caching example
devin-ai-integration[bot] Mar 30, 2025
82c4c83
(stacked pr): improve asyncio event loop issues in HTTP caching imple…
devin-ai-integration[bot] Apr 1, 2025
8e0cbba
refactor http_cache as property of executor
aaronsteers Apr 1, 2025
46cd360
mypy fixes
aaronsteers Apr 1, 2025
598af6e
working proxy redirect in docker container
aaronsteers Apr 1, 2025
a7b925c
refactor proxy implementation
aaronsteers Apr 2, 2025
9b78812
add cursor rules file
aaronsteers Apr 2, 2025
a343634
most functionality working 🚀 (except consolidation)
aaronsteers Apr 2, 2025
4c68047
committing old wip changes for combined ca
aaronsteers Apr 7, 2025
ef43d32
Merge branch 'main' into devin/1743363370-http-caching
aaronsteers Apr 7, 2025
f2f48fb
fix tests
aaronsteers Apr 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .cursorrules
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Cursor Rules file

## Python Best Practices

### Modern Typing

- Always use types.
- Always use modern native types (dict not Dict, tuple, not Tuple).
- Always use pipe (|) instead of `Union` for combining types.
- Always use `x | None` instead of `Optional[x]`.

### File and Directory Access

- Always use `pathlib` and `Path` instead of older `os` methods.
- Always prefer `Path.read_text()` and `Path.write_text()` instead of `open()` and an unnecessary context manager.
6 changes: 6 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
from airbyte.datasets import CachedDataset
from airbyte.destinations.base import Destination
from airbyte.destinations.util import get_destination
from airbyte.http_caching import AirbyteConnectorCache, HttpCacheMode, SerializationFormat
from airbyte.records import StreamRecord
from airbyte.results import ReadResult, WriteResult
from airbyte.secrets import SecretSourceEnum, get_secret
Expand All @@ -154,6 +155,7 @@
documents,
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
experimental,
http_caching,
logs,
records,
results,
Expand All @@ -174,6 +176,7 @@
"documents",
"exceptions",
"experimental",
"http_caching",
"logs",
"records",
"registry",
Expand All @@ -189,12 +192,15 @@
"get_source",
"new_local_cache",
# Classes
"AirbyteConnectorCache",
"BigQueryCache",
"CachedDataset",
"Destination",
"DuckDBCache",
"HttpCacheMode",
"ReadResult",
"SecretSourceEnum",
"SerializationFormat",
"Source",
"StreamRecord",
"WriteResult",
Expand Down
2 changes: 1 addition & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def _execute(
connector_name=self.name,
log_text=self._last_log_messages,
original_exception=e,
) from None
) from e


__all__ = [
Expand Down
71 changes: 69 additions & 2 deletions airbyte/_executors/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import os
import subprocess
from abc import ABC, abstractmethod
from contextlib import contextmanager
Expand All @@ -9,11 +10,13 @@

from airbyte import exceptions as exc
from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.http_caching.mitm_proxy import DOCKER_HOST, LOCALHOST


if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator

from airbyte.http_caching.cache import AirbyteConnectorCache
from airbyte.sources.registry import ConnectorMetadata


Expand Down Expand Up @@ -62,6 +65,7 @@ def _stream_from_subprocess(
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
log_file: IO[str] | None = None,
env: dict[str, str] | None = None,
) -> Generator[Iterable[str], None, None]:
"""Stream lines from a subprocess."""
input_thread: Thread | None = None
Expand All @@ -74,6 +78,7 @@ def _stream_from_subprocess(
stderr=log_file,
universal_newlines=True,
encoding="utf-8",
env=env,
)
input_thread = Thread(
target=_pump_input,
Expand Down Expand Up @@ -102,6 +107,7 @@ def _stream_from_subprocess(
stderr=log_file,
universal_newlines=True,
encoding="utf-8",
env=env,
)

if process.stdout is None:
Expand Down Expand Up @@ -154,6 +160,7 @@ def __init__(
name: str | None = None,
metadata: ConnectorMetadata | None = None,
target_version: str | None = None,
http_cache: AirbyteConnectorCache | None = None,
) -> None:
"""Initialize a connector executor.

Expand All @@ -167,6 +174,7 @@ def __init__(
) # metadata is not None here
self.metadata: ConnectorMetadata | None = metadata
self.enforce_version: bool = target_version is not None
self.http_cache: AirbyteConnectorCache | None = http_cache

self.reported_version: str | None = None
self.target_version: str | None = None
Expand All @@ -176,6 +184,42 @@ def __init__(
else:
self.target_version = target_version

@property
def _proxy_host(self) -> str | None:
"""Return the host name of the proxy server.

This can be overridden in cases (like) docker, where we need to
remap localhost to host.docker.internal.
"""
if self.http_cache:
return self.http_cache.proxy_host

return None

@property
def _proxy_env_vars(self) -> dict[str, str]:
"""Return the environment variables for the proxy server.

This is used to set the HTTP_PROXY and HTTPS_PROXY environment variables.
"""
if not self.http_cache:
return {}

# Generally proxy_host will be 'host.docker.internal' or 'localhost'
proxy_host = self._proxy_host
proxy_port = self.http_cache.proxy_port

if proxy_host and proxy_port:
return {
"HTTP_PROXY": f"http://{proxy_host}:{proxy_port}",
"HTTPS_PROXY": f"http://{proxy_host}:{proxy_port}",
"NO_PROXY": proxy_host,
# This doesn't work unfortunately:
# "REQUESTS_CA_BUNDLE": "/airbyte/mitm-certs/mitmproxy-ca-cert.pem",
}

return {}

@property
@abstractmethod
def _cli(self) -> list[str]:
Expand Down Expand Up @@ -203,13 +247,36 @@ def execute(

If stdin is provided, it will be passed to the subprocess as STDIN.
"""
mapped_args = self.map_cli_args(args)
cli_cmd = [*self._cli, *self.map_cli_args(args)]
print("Executing command:", " ".join(cli_cmd))
with _stream_from_subprocess(
[*self._cli, *mapped_args],
cli_cmd,
stdin=stdin,
env=self.env_vars,
) as stream_lines:
yield from stream_lines

@property
def env_vars(self) -> dict[str, str]:
"""Get the environment variables for the connector.

By default, this is a copy of `os.environ`. Subclasses may override this method
to provide custom environment variables.

In the future, we may reduce the number of environment variables we pass to the
connector, but for now we pass all of them. This is useful for connectors that
rely on environment variables to configure their behavior.
"""
result = os.environ.copy()
if self.http_cache:
host_name = self.http_cache.proxy_host
if host_name is LOCALHOST:
host_name = f"http://{DOCKER_HOST}"

result.update(self._proxy_env_vars)

return result

@abstractmethod
def ensure_installation(self, *, auto_fix: bool = True) -> None:
_ = auto_fix
Expand Down
5 changes: 5 additions & 0 deletions airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from collections.abc import Iterator

from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.http_caching.cache import AirbyteConnectorCache


def _suppress_cdk_pydantic_deprecation_warnings() -> None:
Expand All @@ -43,6 +44,8 @@ def __init__(
self,
name: str,
manifest: dict | Path,
*,
http_cache: AirbyteConnectorCache | None = None,
) -> None:
"""Initialize a declarative executor.

Expand All @@ -54,6 +57,8 @@ def __init__(

self.name = name
self._manifest_dict: dict
self.http_cache = http_cache

if isinstance(manifest, Path):
self._manifest_dict = cast("dict", yaml.safe_load(manifest.read_text()))

Expand Down
84 changes: 73 additions & 11 deletions airbyte/_executors/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@

import logging
import shutil
import tempfile
from pathlib import Path
from typing import NoReturn
from typing import TYPE_CHECKING, NoReturn

from airbyte import exceptions as exc
from airbyte._executors.base import Executor
from airbyte.constants import TEMP_DIR_OVERRIDE
from airbyte.http_caching.mitm_proxy import DOCKER_HOST, LOCALHOST


if TYPE_CHECKING:
from airbyte.http_caching.cache import AirbyteConnectorCache


logger = logging.getLogger("airbyte")
Expand All @@ -20,16 +27,34 @@
class DockerExecutor(Executor):
def __init__(
self,
name: str | None = None,
name: str,
docker_image: str,
*,
executable: list[str],
target_version: str | None = None,
volumes: dict[Path, str] | None = None,
use_host_network: bool = False,
http_cache: AirbyteConnectorCache | None = None,
) -> None:
self.executable: list[str] = executable
self.volumes: dict[Path, str] = volumes or {}
name = name or executable[0]
super().__init__(name=name, target_version=target_version)
self.docker_image: str = docker_image
self.use_host_network: bool = use_host_network

local_mount_dir = Path().absolute() / name
local_mount_dir.mkdir(exist_ok=True)
self.volumes = {
local_mount_dir: "/local",
(TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())): DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR,
}
if http_cache:
# Mount the cache directory inside the container.
# This allows the connector to access the cached files.

# Add volumes from the HTTP cache
# This includes the CA certificate for SSL verification
for host_path, container_path in http_cache.get_docker_volumes().items():
self.volumes[Path(host_path)] = container_path

super().__init__(
name=name or self.docker_image,
http_cache=http_cache,
)

def ensure_installation(
self,
Expand Down Expand Up @@ -65,8 +90,45 @@ def uninstall(self) -> NoReturn:

@property
def _cli(self) -> list[str]:
"""Get the base args of the CLI executable."""
return self.executable
"""Get the executable CLI command.

For Docker executors, this is the `docker run` command with the necessary arguments
to run the connector in a Docker container. This will be extended (suffixed) by the
connector's CLI commands and arguments, such as 'spec' or 'check --config=...'.
"""
result: list[str] = [
"docker",
"run",
"--rm",
"-i",
]
for local_dir, container_dir in self.volumes.items():
result.extend(["--volume", f"{local_dir}:{container_dir}"])

for key, value in self._proxy_env_vars.items():
result.extend(["-e", f"{key}={value}"])

if self.use_host_network:
result.extend(["--network", "host"])

result.append(self.docker_image)
return result

@property
def _proxy_host(self) -> str | None:
"""Return the host name of the proxy server.

This is used to set the HTTP_PROXY and HTTPS_PROXY environment variables.
"""
result: str | None = None
if self.http_cache:
result = self.http_cache.proxy_host
if result == LOCALHOST:
# Docker containers cannot access localhost on the host machine.
# Use host.docker.internal instead.
return DOCKER_HOST

return result

def map_cli_args(self, args: list[str]) -> list[str]:
"""Map local file paths to the container's volume paths."""
Expand Down
10 changes: 9 additions & 1 deletion airbyte/_executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


if TYPE_CHECKING:
from airbyte.http_caching.cache import AirbyteConnectorCache
from airbyte.sources.registry import ConnectorMetadata


Expand All @@ -32,6 +33,7 @@ def __init__(
target_version: str | None = None,
pip_url: str | None = None,
install_root: Path | None = None,
http_cache: AirbyteConnectorCache | None = None,
) -> None:
"""Initialize a connector executor that runs a connector in a virtual environment.

Expand All @@ -42,8 +44,14 @@ def __init__(
pip_url: (Optional.) The pip URL of the connector to install.
install_root: (Optional.) The root directory where the virtual environment will be
created. If not provided, the current working directory will be used.
http_cache: (Optional.) The HTTP cache to use for downloading the connector.
"""
super().__init__(name=name, metadata=metadata, target_version=target_version)
super().__init__(
name=name,
metadata=metadata,
target_version=target_version,
http_cache=http_cache,
)

if not pip_url and metadata and not metadata.pypi_package_name:
raise exc.AirbyteConnectorNotPyPiPublishedError(
Expand Down
Loading
Loading