From 4b132d92317a57c7dc1e08007eef0768328dc268 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 16:57:22 +0000 Subject: [PATCH 01/13] fix: resolve asyncio event loop issues in HTTP caching implementation Co-Authored-By: Aaron Steers --- airbyte/http_caching/cache.py | 10 ++++--- examples/run_pokeapi_cached.py | 50 ++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/airbyte/http_caching/cache.py b/airbyte/http_caching/cache.py index d93df42b..08806623 100644 --- a/airbyte/http_caching/cache.py +++ b/airbyte/http_caching/cache.py @@ -8,6 +8,7 @@ import threading import time from pathlib import Path +from typing import Optional from mitmproxy import options from mitmproxy.tools.dump import DumpMaster @@ -94,14 +95,15 @@ def start(self) -> int: ) self._addon = addon - proxy = DumpMaster(opts) - self._proxy = proxy - proxy.addons.add(addon) - def run_proxy() -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + try: + proxy = DumpMaster(opts) + self._proxy = proxy + proxy.addons.add(addon) + loop.run_until_complete(proxy.run()) except Exception: logger.exception("Error running proxy") diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 4a73713f..0cd4df30 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -1,35 +1,55 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -"""A simple test of PyAirbyte, using the PokeAPI source connector. +"""A simple test of PyAirbyte, using the PokeAPI source connector with HTTP caching. Usage (from PyAirbyte root directory): > poetry run python ./examples/run_pokeapi_cached.py No setup is needed, but you may need to delete the .venv-source-pokeapi folder if your installation gets interrupted or corrupted. + +This example demonstrates HTTP caching using mitmproxy's native format. +It will make HTTP requests on the first run and use cached responses on subsequent runs. """ from __future__ import annotations +import os import airbyte as ab from airbyte import get_source, AirbyteConnectorCache +cache_dir = os.path.join(os.path.expanduser("~"), ".airbyte-http-cache-test") +os.makedirs(cache_dir, exist_ok=True) + # Create an HTTP cache cache = AirbyteConnectorCache( + cache_dir=cache_dir, mode="read_write", serialization_format="native", # Use mitmproxy's native format ) -cache.start() - -source = get_source( - "source-pokeapi", - config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - http_cache=cache, - streams=["pokemon"], -) -source.check() - -# # print(list(source.get_records("pokemon"))) -source.read(cache=ab.new_local_cache("poke")) -cache.stop() +try: + port = cache.start() + print(f"HTTP cache started on port {port}") + + source = get_source( + "source-pokeapi", + config={"pokemon_name": "bulbasaur"}, + source_manifest=True, + http_cache=cache, + streams=["pokemon"], + ) + + print("Checking source connection...") + source.check() + print("Source connection successful") + + local_cache = ab.new_local_cache("poke") + + print("Reading data from source...") + source.read(cache=local_cache) + print("Data read successfully") + +finally: + print("Stopping HTTP cache...") + cache.stop() + print("HTTP cache stopped") From 23a255674ef639c460faf9c33af56f2ba1f9694b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 16:59:51 +0000 Subject: [PATCH 02/13] fix: resolve formatting and linting issues Co-Authored-By: Aaron Steers --- airbyte/http_caching/cache.py | 5 ++--- examples/run_pokeapi_cached.py | 10 +++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/airbyte/http_caching/cache.py b/airbyte/http_caching/cache.py index 08806623..5eb67705 100644 --- a/airbyte/http_caching/cache.py +++ b/airbyte/http_caching/cache.py @@ -8,7 +8,6 @@ import threading import time from pathlib import Path -from typing import Optional from mitmproxy import options from mitmproxy.tools.dump import DumpMaster @@ -98,12 +97,12 @@ def start(self) -> int: def run_proxy() -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + try: proxy = DumpMaster(opts) self._proxy = proxy proxy.addons.add(addon) - + loop.run_until_complete(proxy.run()) except Exception: logger.exception("Error running proxy") diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 0cd4df30..435f1a2f 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -30,7 +30,7 @@ try: port = cache.start() print(f"HTTP cache started on port {port}") - + source = get_source( "source-pokeapi", config={"pokemon_name": "bulbasaur"}, @@ -38,17 +38,17 @@ http_cache=cache, streams=["pokemon"], ) - + print("Checking source connection...") source.check() print("Source connection successful") - + local_cache = ab.new_local_cache("poke") - + print("Reading data from source...") source.read(cache=local_cache) print("Data read successfully") - + finally: print("Stopping HTTP cache...") cache.stop() From 357f7dc8bfb9471da8bb56c60e12d8fefaac4bc2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:02:20 +0000 Subject: [PATCH 03/13] fix: add proper type annotations for HTTPFlow in serialization Co-Authored-By: Aaron Steers --- airbyte/http_caching/serialization.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte/http_caching/serialization.py b/airbyte/http_caching/serialization.py index 7f7a7850..31fb4103 100644 --- a/airbyte/http_caching/serialization.py +++ b/airbyte/http_caching/serialization.py @@ -8,6 +8,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, Protocol +from mitmproxy.http import HTTPFlow from mitmproxy.io import io @@ -122,12 +123,13 @@ def deserialize(self, path: Path) -> T_SerializedData: if flows and len(flows) > 0: flow = flows[0] + http_flow = flow # type: HTTPFlow return { "type": "http", - "request": flow.request.get_state() if flow.request else {}, - "response": flow.response.get_state() if flow.response else None, - "error": flow.error.get_state() - if hasattr(flow, "error") and flow.error + "request": http_flow.request.get_state() if http_flow.request else {}, + "response": http_flow.response.get_state() if http_flow.response else None, + "error": http_flow.error.get_state() + if hasattr(http_flow, "error") and http_flow.error else None, } except Exception as e: From 0f003dcda5f94c5bc57c201848c7ef949d86d8b2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:05:51 +0000 Subject: [PATCH 04/13] fix: organize imports in serialization.py Co-Authored-By: Aaron Steers --- airbyte/http_caching/serialization.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/airbyte/http_caching/serialization.py b/airbyte/http_caching/serialization.py index 31fb4103..9779f533 100644 --- a/airbyte/http_caching/serialization.py +++ b/airbyte/http_caching/serialization.py @@ -6,12 +6,17 @@ import json import logging from enum import Enum -from typing import TYPE_CHECKING, Any, Protocol +from typing import TYPE_CHECKING, Any, Protocol, cast -from mitmproxy.http import HTTPFlow from mitmproxy.io import io +if TYPE_CHECKING: + from pathlib import Path + + from mitmproxy.http import HTTPFlow + + logger = logging.getLogger(__name__) @@ -123,7 +128,7 @@ def deserialize(self, path: Path) -> T_SerializedData: if flows and len(flows) > 0: flow = flows[0] - http_flow = flow # type: HTTPFlow + http_flow = cast("HTTPFlow", flow) return { "type": "http", "request": http_flow.request.get_state() if http_flow.request else {}, From 229421c2fcaabf8d03677041dcb9542f1a4464a0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:07:02 +0000 Subject: [PATCH 05/13] feat: add improved example script for HTTP caching Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached_fixed.py | 58 ++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 examples/run_pokeapi_cached_fixed.py diff --git a/examples/run_pokeapi_cached_fixed.py b/examples/run_pokeapi_cached_fixed.py new file mode 100644 index 00000000..735d2e72 --- /dev/null +++ b/examples/run_pokeapi_cached_fixed.py @@ -0,0 +1,58 @@ +"""A simple test of PyAirbyte, using the PokeAPI source connector with HTTP caching. + +Usage (from PyAirbyte root directory): +> poetry run python ./examples/run_pokeapi_cached_fixed.py + +No setup is needed, but you may need to delete the .venv-source-pokeapi folder +if your installation gets interrupted or corrupted. +""" + +from __future__ import annotations + +import asyncio +import os +import airbyte as ab +from airbyte import get_source, AirbyteConnectorCache + +cache_dir = os.path.join(os.path.expanduser("~"), ".airbyte-http-cache-test") +os.makedirs(cache_dir, exist_ok=True) + +cache = AirbyteConnectorCache( + cache_dir=cache_dir, + mode="read_write", + serialization_format="native", # Use mitmproxy's native format +) + +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + +try: + port = cache.start() + print(f"HTTP cache started on port {port}") + + source = get_source( + "source-pokeapi", + config={"pokemon_name": "bulbasaur"}, + source_manifest=True, + http_cache=cache, + streams=["pokemon"], + ) + + print("Checking source connection...") + source.check() + print("Source connection successful") + + local_cache = ab.new_local_cache("poke") + + print("First run - making HTTP requests...") + source.read(cache=local_cache) + print("First run completed") + + print("Second run - should use cached responses...") + source.read(cache=local_cache) + print("Second run completed") + +finally: + print("Stopping HTTP cache...") + cache.stop() + print("HTTP cache stopped") From bfbcff0f08f3633351d92e297fa9fdcbc5aea259 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:12:18 +0000 Subject: [PATCH 06/13] fix: resolve formatting issues with ruff Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached_fixed.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/run_pokeapi_cached_fixed.py b/examples/run_pokeapi_cached_fixed.py index 735d2e72..751975cc 100644 --- a/examples/run_pokeapi_cached_fixed.py +++ b/examples/run_pokeapi_cached_fixed.py @@ -29,7 +29,7 @@ try: port = cache.start() print(f"HTTP cache started on port {port}") - + source = get_source( "source-pokeapi", config={"pokemon_name": "bulbasaur"}, @@ -37,21 +37,21 @@ http_cache=cache, streams=["pokemon"], ) - + print("Checking source connection...") source.check() print("Source connection successful") - + local_cache = ab.new_local_cache("poke") - + print("First run - making HTTP requests...") source.read(cache=local_cache) print("First run completed") - + print("Second run - should use cached responses...") source.read(cache=local_cache) print("Second run completed") - + finally: print("Stopping HTTP cache...") cache.stop() From 115778b7aae99ed0a22bfceb197e84fb14e740e8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:17:43 +0000 Subject: [PATCH 07/13] fix: combine scripts and remove duplicate as requested Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached.py | 12 +++++- examples/run_pokeapi_cached_fixed.py | 58 ---------------------------- 2 files changed, 10 insertions(+), 60 deletions(-) delete mode 100644 examples/run_pokeapi_cached_fixed.py diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 435f1a2f..86ce4a93 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -13,6 +13,7 @@ from __future__ import annotations +import asyncio import os import airbyte as ab from airbyte import get_source, AirbyteConnectorCache @@ -27,6 +28,9 @@ serialization_format="native", # Use mitmproxy's native format ) +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) + try: port = cache.start() print(f"HTTP cache started on port {port}") @@ -45,9 +49,13 @@ local_cache = ab.new_local_cache("poke") - print("Reading data from source...") + print("First run - making HTTP requests...") + source.read(cache=local_cache) + print("First run completed") + + print("Second run - should use cached responses...") source.read(cache=local_cache) - print("Data read successfully") + print("Second run completed") finally: print("Stopping HTTP cache...") diff --git a/examples/run_pokeapi_cached_fixed.py b/examples/run_pokeapi_cached_fixed.py deleted file mode 100644 index 751975cc..00000000 --- a/examples/run_pokeapi_cached_fixed.py +++ /dev/null @@ -1,58 +0,0 @@ -"""A simple test of PyAirbyte, using the PokeAPI source connector with HTTP caching. - -Usage (from PyAirbyte root directory): -> poetry run python ./examples/run_pokeapi_cached_fixed.py - -No setup is needed, but you may need to delete the .venv-source-pokeapi folder -if your installation gets interrupted or corrupted. -""" - -from __future__ import annotations - -import asyncio -import os -import airbyte as ab -from airbyte import get_source, AirbyteConnectorCache - -cache_dir = os.path.join(os.path.expanduser("~"), ".airbyte-http-cache-test") -os.makedirs(cache_dir, exist_ok=True) - -cache = AirbyteConnectorCache( - cache_dir=cache_dir, - mode="read_write", - serialization_format="native", # Use mitmproxy's native format -) - -loop = asyncio.new_event_loop() -asyncio.set_event_loop(loop) - -try: - port = cache.start() - print(f"HTTP cache started on port {port}") - - source = get_source( - "source-pokeapi", - config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - http_cache=cache, - streams=["pokemon"], - ) - - print("Checking source connection...") - source.check() - print("Source connection successful") - - local_cache = ab.new_local_cache("poke") - - print("First run - making HTTP requests...") - source.read(cache=local_cache) - print("First run completed") - - print("Second run - should use cached responses...") - source.read(cache=local_cache) - print("Second run completed") - -finally: - print("Stopping HTTP cache...") - cache.stop() - print("HTTP cache stopped") From 1e5333077b2d113f18a937ba601bf362fc2a17be Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:19:19 +0000 Subject: [PATCH 08/13] fix: remove extra whitespace to fix formatting Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 86ce4a93..7646c8c8 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -52,7 +52,7 @@ print("First run - making HTTP requests...") source.read(cache=local_cache) print("First run completed") - + print("Second run - should use cached responses...") source.read(cache=local_cache) print("Second run completed") From 020b8d51eee5094da80fd41cbe97f0d3ac5cf575 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:24:52 +0000 Subject: [PATCH 09/13] fix: improve event loop handling in HTTP caching implementation Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached.py | 58 ++++++++++++++++------------------ 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 7646c8c8..c07c69cd 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -31,33 +31,31 @@ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) -try: - port = cache.start() - print(f"HTTP cache started on port {port}") - - source = get_source( - "source-pokeapi", - config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - http_cache=cache, - streams=["pokemon"], - ) - - print("Checking source connection...") - source.check() - print("Source connection successful") - - local_cache = ab.new_local_cache("poke") - - print("First run - making HTTP requests...") - source.read(cache=local_cache) - print("First run completed") - - print("Second run - should use cached responses...") - source.read(cache=local_cache) - print("Second run completed") - -finally: - print("Stopping HTTP cache...") - cache.stop() - print("HTTP cache stopped") +port = cache.start() +print(f"HTTP cache started on port {port}") + +source = get_source( + "source-pokeapi", + config={"pokemon_name": "bulbasaur"}, + source_manifest=True, + http_cache=cache, + streams=["pokemon"], +) + +print("Checking source connection...") +source.check() +print("Source connection successful") + +local_cache = ab.new_local_cache("poke") + +print("First run - making HTTP requests...") +source.read(cache=local_cache) +print("First run completed") + +print("Second run - should use cached responses...") +source.read(cache=local_cache) +print("Second run completed") + +print("Stopping HTTP cache...") +cache.stop() +print("HTTP cache stopped") From 8319534e9b6d3fd79a422a391a1ace19a458532a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:26:54 +0000 Subject: [PATCH 10/13] fix: pass loop parameter to DumpMaster constructor Co-Authored-By: Aaron Steers --- airbyte/http_caching/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/http_caching/cache.py b/airbyte/http_caching/cache.py index 5eb67705..f46d0e74 100644 --- a/airbyte/http_caching/cache.py +++ b/airbyte/http_caching/cache.py @@ -99,7 +99,7 @@ def run_proxy() -> None: asyncio.set_event_loop(loop) try: - proxy = DumpMaster(opts) + proxy = DumpMaster(opts, loop=loop) self._proxy = proxy proxy.addons.add(addon) From 952536a0b53c2d1cfedcdd0ac5f827d220c7e861 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:33:12 +0000 Subject: [PATCH 11/13] fix: avoid port binding issues in HTTP caching example script Co-Authored-By: Aaron Steers --- examples/run_pokeapi_cached.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index c07c69cd..26ad07b1 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -38,7 +38,6 @@ "source-pokeapi", config={"pokemon_name": "bulbasaur"}, source_manifest=True, - http_cache=cache, streams=["pokemon"], ) @@ -49,7 +48,14 @@ local_cache = ab.new_local_cache("poke") print("First run - making HTTP requests...") -source.read(cache=local_cache) +source_with_cache = get_source( + "source-pokeapi", + config={"pokemon_name": "bulbasaur"}, + source_manifest=True, + http_cache=cache, + streams=["pokemon"], +) +source_with_cache.read(cache=local_cache) print("First run completed") print("Second run - should use cached responses...") From 9f699e62197e3ce7331d96e0b1ebcf6f993367a9 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 1 Apr 2025 10:44:01 -0700 Subject: [PATCH 12/13] Use git-ignored cache dir in project directory --- examples/run_pokeapi_cached.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index 26ad07b1..bcddbd19 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -16,10 +16,12 @@ import asyncio import os import airbyte as ab +from pathlib import Path from airbyte import get_source, AirbyteConnectorCache -cache_dir = os.path.join(os.path.expanduser("~"), ".airbyte-http-cache-test") +cache_dir = Path(".airbyte-http-cache") os.makedirs(cache_dir, exist_ok=True) +Path(cache_dir / ".gitignore").write_text("# Ignore all files in this directory\n*") # Create an HTTP cache cache = AirbyteConnectorCache( From ff093b4d15ff8e58412c666d5225e2b17f2905ba Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 1 Apr 2025 11:40:36 -0700 Subject: [PATCH 13/13] simplify example script, switch to docker --- examples/run_pokeapi_cached.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/examples/run_pokeapi_cached.py b/examples/run_pokeapi_cached.py index bcddbd19..fc16486e 100644 --- a/examples/run_pokeapi_cached.py +++ b/examples/run_pokeapi_cached.py @@ -24,7 +24,7 @@ Path(cache_dir / ".gitignore").write_text("# Ignore all files in this directory\n*") # Create an HTTP cache -cache = AirbyteConnectorCache( +http_cache = AirbyteConnectorCache( cache_dir=cache_dir, mode="read_write", serialization_format="native", # Use mitmproxy's native format @@ -33,31 +33,23 @@ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) -port = cache.start() +port = http_cache.start() print(f"HTTP cache started on port {port}") -source = get_source( - "source-pokeapi", - config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - streams=["pokemon"], -) - -print("Checking source connection...") -source.check() -print("Source connection successful") - -local_cache = ab.new_local_cache("poke") +local_cache: ab.DuckDBCache = ab.new_local_cache("poke") print("First run - making HTTP requests...") -source_with_cache = get_source( +source: ab.Source = get_source( "source-pokeapi", config={"pokemon_name": "bulbasaur"}, - source_manifest=True, - http_cache=cache, + docker_image=True, + use_host_network=True, + http_cache=http_cache, streams=["pokemon"], ) -source_with_cache.read(cache=local_cache) +source.check() +print("Source check successful") +source.read(cache=local_cache) print("First run completed") print("Second run - should use cached responses...") @@ -65,5 +57,5 @@ print("Second run completed") print("Stopping HTTP cache...") -cache.stop() +http_cache.stop() print("HTTP cache stopped")