diff --git a/airbyte/http_caching/cache.py b/airbyte/http_caching/cache.py index d93df42b..f46d0e74 100644 --- a/airbyte/http_caching/cache.py +++ b/airbyte/http_caching/cache.py @@ -94,14 +94,15 @@ def start(self) -> int: ) self._addon = addon - proxy = DumpMaster(opts) - self._proxy = proxy - proxy.addons.add(addon) - def run_proxy() -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + try: + proxy = DumpMaster(opts, loop=loop) + self._proxy = proxy + proxy.addons.add(addon) + loop.run_until_complete(proxy.run()) except Exception: logger.exception("Error running proxy") diff --git a/airbyte/http_caching/serialization.py b/airbyte/http_caching/serialization.py index 7f7a7850..9779f533 100644 --- a/airbyte/http_caching/serialization.py +++ b/airbyte/http_caching/serialization.py @@ -6,11 +6,17 @@ import json import logging from enum import Enum -from typing import TYPE_CHECKING, Any, Protocol +from typing import TYPE_CHECKING, Any, Protocol, cast from mitmproxy.io import io +if TYPE_CHECKING: + from pathlib import Path + + from mitmproxy.http import HTTPFlow + + logger = logging.getLogger(__name__) @@ -122,12 +128,13 @@ def deserialize(self, path: Path) -> T_SerializedData: if flows and len(flows) > 0: flow = flows[0] + http_flow = cast("HTTPFlow", flow) return { "type": "http", - "request": flow.request.get_state() if flow.request else {}, - "response": flow.response.get_state() if flow.response else None, - "error": flow.error.get_state() - if hasattr(flow, "error") and flow.error + "request": http_flow.request.get_state() if http_flow.request else {}, + "response": http_flow.response.get_state() if http_flow.response else None, + "error": http_flow.error.get_state() + if hasattr(http_flow, "error") and http_flow.error else None, } except Exception as e: diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 4a73713f..fc16486e 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -1,35 +1,61 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -"""A simple test of PyAirbyte, using the PokeAPI source connector. +"""A simple test of PyAirbyte, using the PokeAPI source connector with HTTP caching. Usage (from PyAirbyte root directory): > poetry run python ./examples/run_pokeapi_cached.py No setup is needed, but you may need to delete the .venv-source-pokeapi folder if your installation gets interrupted or corrupted. + +This example demonstrates HTTP caching using mitmproxy's native format. +It will make HTTP requests on the first run and use cached responses on subsequent runs. """ from __future__ import annotations +import asyncio +import os import airbyte as ab +from pathlib import Path from airbyte import get_source, AirbyteConnectorCache +cache_dir = Path(".airbyte-http-cache") +os.makedirs(cache_dir, exist_ok=True) +Path(cache_dir / ".gitignore").write_text("# Ignore all files in this directory\n*") + # Create an HTTP cache -cache = AirbyteConnectorCache( +http_cache = AirbyteConnectorCache( + cache_dir=cache_dir, mode="read_write", serialization_format="native", # Use mitmproxy's native format ) -cache.start() -source = get_source( +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + +port = http_cache.start() +print(f"HTTP cache started on port {port}") + +local_cache: ab.DuckDBCache = ab.new_local_cache("poke") + +print("First run - making HTTP requests...") +source: ab.Source = get_source( "source-pokeapi", config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - http_cache=cache, + docker_image=True, + use_host_network=True, + http_cache=http_cache, streams=["pokemon"], ) source.check() +print("Source check successful") +source.read(cache=local_cache) +print("First run completed") -# # print(list(source.get_records("pokemon"))) -source.read(cache=ab.new_local_cache("poke")) +print("Second run - should use cached responses...") +source.read(cache=local_cache) +print("Second run completed") -cache.stop() +print("Stopping HTTP cache...") +http_cache.stop() +print("HTTP cache stopped")