Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement HTTP caching with mitmproxy's native format #646

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
737c0c0
feat: implement HTTP caching for connectors
devin-ai-integration[bot] Mar 30, 2025
634f4de
fix: fix import ordering and type ignore issues
devin-ai-integration[bot] Mar 30, 2025
5fcfb92
chore: update poetry.lock
devin-ai-integration[bot] Mar 30, 2025
3a35bb1
fix: fix type annotation issues and unreachable code
devin-ai-integration[bot] Mar 30, 2025
61b4622
style: fix formatting and linting issues
devin-ai-integration[bot] Mar 30, 2025
e25a223
refactor: replace pickle with mitmproxy's native format for HTTP caching
devin-ai-integration[bot] Mar 30, 2025
e8696e4
fix: improve file path handling for Windows compatibility
devin-ai-integration[bot] Mar 30, 2025
b9fa560
docs: fix HAR format name in README
devin-ai-integration[bot] Mar 30, 2025
88a8895
chore: add `poe install` shortcut (side quest)
aaronsteers Mar 30, 2025
6755085
add cached example script
aaronsteers Mar 30, 2025
1e94da3
fix: resolve asyncio and serialization issues in HTTP caching example
devin-ai-integration[bot] Mar 30, 2025
82c4c83
(stacked pr): improve asyncio event loop issues in HTTP caching imple…
devin-ai-integration[bot] Apr 1, 2025
8e0cbba
refactor http_cache as property of executor
aaronsteers Apr 1, 2025
46cd360
mypy fixes
aaronsteers Apr 1, 2025
598af6e
working proxy redirect in docker container
aaronsteers Apr 1, 2025
a7b925c
refactor proxy implementation
aaronsteers Apr 2, 2025
9b78812
add cursor rules file
aaronsteers Apr 2, 2025
a343634
most functionality working 🚀 (except consolidation)
aaronsteers Apr 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@
from airbyte.datasets import CachedDataset
from airbyte.destinations.base import Destination
from airbyte.destinations.util import get_destination
from airbyte.http_caching import AirbyteConnectorCache, HttpCacheMode
from airbyte.http_caching.serialization import SerializationFormat
from airbyte.records import StreamRecord
from airbyte.results import ReadResult, WriteResult
from airbyte.secrets import SecretSourceEnum, get_secret
Expand All @@ -154,6 +156,7 @@
documents,
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
experimental,
http_caching,
logs,
records,
results,
Expand All @@ -174,6 +177,7 @@
"documents",
"exceptions",
"experimental",
"http_caching",
"logs",
"records",
"registry",
Expand All @@ -189,12 +193,15 @@
"get_source",
"new_local_cache",
# Classes
"AirbyteConnectorCache",
"BigQueryCache",
"CachedDataset",
"Destination",
"DuckDBCache",
"HttpCacheMode",
"ReadResult",
"SecretSourceEnum",
"SerializationFormat",
"Source",
"StreamRecord",
"WriteResult",
Expand Down
3 changes: 2 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def _execute(
stdin: IO[str] | AirbyteMessageIterator | None = None,
*,
progress_tracker: ProgressTracker | None = None,
env: dict[str, str] | None = None,
) -> Generator[AirbyteMessage, None, None]:
"""Execute the connector with the given arguments.

Expand Down Expand Up @@ -432,7 +433,7 @@ def _execute(
)

try:
for line in self.executor.execute(args, stdin=stdin):
for line in self.executor.execute(args, stdin=stdin, env=env):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
if progress_tracker and message.record:
Expand Down
5 changes: 5 additions & 0 deletions airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _stream_from_subprocess(
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
log_file: IO[str] | None = None,
env: dict[str, str] | None = None,
) -> Generator[Iterable[str], None, None]:
"""Stream lines from a subprocess."""
input_thread: Thread | None = None
Expand All @@ -74,6 +75,7 @@ def _stream_from_subprocess(
stderr=log_file,
universal_newlines=True,
encoding="utf-8",
env=env,
)
input_thread = Thread(
target=_pump_input,
Expand Down Expand Up @@ -102,6 +104,7 @@ def _stream_from_subprocess(
stderr=log_file,
universal_newlines=True,
encoding="utf-8",
env=env,
)

if process.stdout is None:
Expand Down Expand Up @@ -198,6 +201,7 @@ def execute(
args: list[str],
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
env: dict[str, str] | None = None,
) -> Iterator[str]:
"""Execute a command and return an iterator of STDOUT lines.

Expand All @@ -207,6 +211,7 @@ def execute(
with _stream_from_subprocess(
[*self._cli, *mapped_args],
stdin=stdin,
env=env,
) as stream_lines:
yield from stream_lines

Expand Down
3 changes: 2 additions & 1 deletion airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ def execute(
args: list[str],
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
env: dict[str, str] | None = None,
) -> Iterator[str]:
"""Execute the declarative source."""
_ = stdin # Not used
_ = stdin, env # Not used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: We may need to (later, not here) implement a cache option for declarative execution. This runs through our same Python process, so we may either need to patch the requests library or have another implementation. Lmk if you have thoughts, but don't implement.

source_entrypoint = AirbyteEntrypoint(self.declarative_source)

mapped_args: list[str] = self.map_cli_args(args)
Expand Down
24 changes: 24 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,27 @@ def _str_to_bool(value: str) -> bool:
If not set, the default value is `False` for non-CI environments.
If running in a CI environment ("CI" env var is set), then the default value is `True`.
"""

DEFAULT_HTTP_CACHE_DIR: Path = (
Path(".airbyte-http-cache")
if "AIRBYTE_HTTP_CACHE_DIR" not in os.environ
else Path(os.environ["AIRBYTE_HTTP_CACHE_DIR"])
)
"""Default HTTP cache directory is `.airbyte-http-cache` in the current working directory.

The default location can be overridden by setting the `AIRBYTE_HTTP_CACHE_DIR` environment variable.

This directory is used to store cached HTTP requests and responses when using the HTTP caching
functionality with connectors.
"""

DEFAULT_HTTP_CACHE_READ_DIR: Path | None = (
None
if "AIRBYTE_HTTP_CACHE_READ_DIR" not in os.environ
else Path(os.environ["AIRBYTE_HTTP_CACHE_READ_DIR"])
)
"""Optional separate directory for reading HTTP cache files.

If set, this directory will be used for reading cached HTTP responses, while the
`DEFAULT_HTTP_CACHE_DIR` will be used for writing new cache files.
"""
71 changes: 71 additions & 0 deletions airbyte/http_caching/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# HTTP Caching for PyAirbyte

This module provides HTTP caching functionality for Airbyte connectors using mitmproxy's Python API.

## Serialization Formats

### Native Format

The native format is mitmproxy's native serialization format (.mitm). This format is used to store HTTP
flows (requests and responses) in a format that is natively readable by mitmproxy tools.
Using this format allows for interoperability with other tools in the mitmproxy ecosystem.

### JSON Format

The JSON format provides a human-readable alternative for storing HTTP flows. This makes it easier
to inspect and debug cached responses.

## HAR Format (Not Currently Implemented)

HAR (HTTP Archive) is a standard format for logging HTTP transactions in JSON format.
It's commonly used by browser developer tools and HTTP monitoring applications to capture
detailed information about web requests and responses. Some key features of HAR:

1. It records complete request and response data including headers, content, and timing
2. It's human-readable and can be analyzed with various tools
3. It's widely supported by browser developer tools and HTTP analysis applications

HAR format is not currently implemented in PyAirbyte's HTTP caching system, but may be considered
for future implementations where detailed HTTP transaction logging is important.

## Usage

```python
from airbyte import get_source, AirbyteConnectorCache

# Create an HTTP cache
cache = AirbyteConnectorCache(
mode="read_write",
serialization_format="native" # Use mitmproxy's native format
)

# Use the cache with a source
source = get_source(
name="source-github",
config={"repository": "airbytehq/airbyte"},
http_cache=cache
)

# The source will now use the HTTP cache for all requests
```

## Cache Modes

The HTTP cache supports four modes:

1. **Read Only**: Only read from the cache. If a request is not in the cache, it will be made to the server.
2. **Write Only**: Only write to the cache. All requests will be made to the server and cached.
3. **Read/Write**: Read from the cache if available, otherwise make the request to the server and cache it.
4. **Read Only Fail on Miss**: Only read from the cache. If a request is not in the cache, an exception will be raised.

## Configuration

By default, cache files are stored in a local directory called `.airbyte-http-cache`. This can be overridden using:

1. The `cache_dir` parameter when creating an `AirbyteConnectorCache` instance
2. The `AIRBYTE_HTTP_CACHE_DIR` environment variable

For separate read and write directories, you can use:

1. The `read_dir` parameter when creating an `AirbyteConnectorCache` instance
2. The `AIRBYTE_HTTP_CACHE_READ_DIR` environment variable
10 changes: 10 additions & 0 deletions airbyte/http_caching/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""HTTP caching module for Airbyte connectors."""

from __future__ import annotations

from airbyte.http_caching.cache import AirbyteConnectorCache, HttpCacheMode
from airbyte.http_caching.serialization import SerializationFormat


__all__ = ["AirbyteConnectorCache", "HttpCacheMode", "SerializationFormat"]
112 changes: 112 additions & 0 deletions airbyte/http_caching/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""HTTP caching for Airbyte connectors."""

from __future__ import annotations

import threading
from pathlib import Path
from typing import cast

from mitmproxy import options
from mitmproxy.tools.dump import DumpMaster

from airbyte.constants import DEFAULT_HTTP_CACHE_DIR, DEFAULT_HTTP_CACHE_READ_DIR
from airbyte.http_caching.proxy import HttpCacheMode, HttpCachingAddon
from airbyte.http_caching.serialization import SerializationFormat


class AirbyteConnectorCache:
"""Cache for Airbyte connector HTTP requests and responses.

This class manages an HTTP proxy that intercepts requests from connectors and either
serves them from the cache or forwards them to the server based on the cache mode.
"""

def __init__(
self,
cache_dir: str | Path | None = None,
read_dir: str | Path | None = None,
mode: str | HttpCacheMode = HttpCacheMode.READ_WRITE,
serialization_format: str | SerializationFormat = SerializationFormat.NATIVE,
) -> None:
"""Initialize the cache.

Args:
cache_dir: The directory where cache files are stored for writing. If not provided,
the default directory will be used.
read_dir: Optional separate directory for reading cached responses. If not provided,
cache_dir will be used for both reading and writing.
mode: The cache mode to use. Can be one of 'read_only', 'write_only',
'read_write', or 'read_only_fail_on_miss'.
serialization_format: The format to use for serializing cached data. Can be
'json' or 'binary'.
"""
self.cache_dir = Path(cache_dir) if cache_dir else DEFAULT_HTTP_CACHE_DIR

if read_dir:
self.read_dir = Path(read_dir)
elif DEFAULT_HTTP_CACHE_READ_DIR:
self.read_dir = DEFAULT_HTTP_CACHE_READ_DIR
else:
self.read_dir = self.cache_dir

self.mode = HttpCacheMode(mode) if isinstance(mode, str) else mode

self.serialization_format = (
SerializationFormat(serialization_format)
if isinstance(serialization_format, str)
else serialization_format
)

self._proxy_port: int | None = None
self._proxy_thread: threading.Thread | None = None
self._proxy: DumpMaster | None = None
self._addon: HttpCachingAddon | None = None

def start(self) -> int:
"""Start the HTTP proxy.

Returns:
The port number the proxy is listening on.
"""
if self._proxy_port is not None:
return self._proxy_port

port = 0

opts = options.Options(
listen_host="127.0.0.1",
listen_port=port,
ssl_insecure=True, # Allow self-signed certificates
confdir=str(self.cache_dir), # Store certificates in the cache directory
)

addon = HttpCachingAddon(
cache_dir=self.cache_dir,
read_dir=self.read_dir,
mode=self.mode,
serialization_format=self.serialization_format,
)
self._addon = addon

proxy = DumpMaster(opts)
self._proxy = proxy
proxy.addons.add(addon)

thread = threading.Thread(target=proxy.run, daemon=True)
self._proxy_thread = thread
thread.start()

port_number = cast("int", proxy.server.address[1]) # type: ignore[attr-defined]
self._proxy_port = port_number

return port_number

def stop(self) -> None:
"""Stop the HTTP proxy."""
if self._proxy is not None:
self._proxy.shutdown()
self._proxy_thread = None
self._proxy = None
self._addon = None
self._proxy_port = None
Loading
Loading