Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airbyte/http_caching/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ def start(self) -> int:
)
self._addon = addon

proxy = DumpMaster(opts)
self._proxy = proxy
proxy.addons.add(addon)

def run_proxy() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
proxy = DumpMaster(opts, loop=loop)
self._proxy = proxy
proxy.addons.add(addon)

loop.run_until_complete(proxy.run())
except Exception:
logger.exception("Error running proxy")
Expand Down
17 changes: 12 additions & 5 deletions airbyte/http_caching/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
import json
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Protocol
from typing import TYPE_CHECKING, Any, Protocol, cast

from mitmproxy.io import io


if TYPE_CHECKING:
from pathlib import Path

from mitmproxy.http import HTTPFlow


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -122,12 +128,13 @@ def deserialize(self, path: Path) -> T_SerializedData:

if flows and len(flows) > 0:
flow = flows[0]
http_flow = cast("HTTPFlow", flow)
return {
"type": "http",
"request": flow.request.get_state() if flow.request else {},
"response": flow.response.get_state() if flow.response else None,
"error": flow.error.get_state()
if hasattr(flow, "error") and flow.error
"request": http_flow.request.get_state() if http_flow.request else {},
"response": http_flow.response.get_state() if http_flow.response else None,
"error": http_flow.error.get_state()
if hasattr(http_flow, "error") and http_flow.error
else None,
}
except Exception as e:
Expand Down
44 changes: 35 additions & 9 deletions examples/run_pokeapi_cached.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""A simple test of PyAirbyte, using the PokeAPI source connector.
"""A simple test of PyAirbyte, using the PokeAPI source connector with HTTP caching.

Usage (from PyAirbyte root directory):
> poetry run python ./examples/run_pokeapi_cached.py

No setup is needed, but you may need to delete the .venv-source-pokeapi folder
if your installation gets interrupted or corrupted.

This example demonstrates HTTP caching using mitmproxy's native format.
It will make HTTP requests on the first run and use cached responses on subsequent runs.
"""

from __future__ import annotations

import asyncio
import os
import airbyte as ab
from pathlib import Path
from airbyte import get_source, AirbyteConnectorCache

cache_dir = Path(".airbyte-http-cache")
os.makedirs(cache_dir, exist_ok=True)
Path(cache_dir / ".gitignore").write_text("# Ignore all files in this directory\n*")

# Create an HTTP cache
cache = AirbyteConnectorCache(
http_cache = AirbyteConnectorCache(
cache_dir=cache_dir,
mode="read_write",
serialization_format="native", # Use mitmproxy's native format
)
cache.start()

source = get_source(
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

port = http_cache.start()
print(f"HTTP cache started on port {port}")

local_cache: ab.DuckDBCache = ab.new_local_cache("poke")

print("First run - making HTTP requests...")
source: ab.Source = get_source(
"source-pokeapi",
config={"pokemon_name": "bulbasaur"},
source_manifest=True,
http_cache=cache,
docker_image=True,
use_host_network=True,
http_cache=http_cache,
streams=["pokemon"],
)
source.check()
print("Source check successful")
source.read(cache=local_cache)
print("First run completed")

# # print(list(source.get_records("pokemon")))
source.read(cache=ab.new_local_cache("poke"))
print("Second run - should use cached responses...")
source.read(cache=local_cache)
print("Second run completed")

cache.stop()
print("Stopping HTTP cache...")
http_cache.stop()
print("HTTP cache stopped")
Loading