Skip to content
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a663037
save bananza mode wip
aaronsteers Jul 31, 2025
68e1df6
feat: Complete bananza mode lake storage implementation
devin-ai-integration[bot] Aug 8, 2025
d5e6713
feat: Add warehouse sizing variables and scaling analysis
devin-ai-integration[bot] Aug 8, 2025
14457bc
fix: Resolve linting and type checking issues
devin-ai-integration[bot] Aug 8, 2025
54069f2
feat: Complete fast lake copy implementation with warehouse scaling
devin-ai-integration[bot] Aug 8, 2025
504eb39
feat: Add comprehensive timestamps and elapsed time to fast lake copy…
devin-ai-integration[bot] Aug 8, 2025
9a69509
feat: Scale faker data to 10 million rows and process only purchases …
devin-ai-integration[bot] Aug 8, 2025
8db85a6
fix: Add force_full_refresh=True to ensure all 10M records are processed
devin-ai-integration[bot] Aug 8, 2025
65d1628
feat: Add detailed performance metrics with records/s and MB/s for ea…
devin-ai-integration[bot] Aug 8, 2025
386f254
feat: Scale to 50 million records and add write_strategy=replace para…
devin-ai-integration[bot] Aug 8, 2025
4d70666
feat: Configure second run with 2XLARGE warehouse and skip initial da…
devin-ai-integration[bot] Aug 8, 2025
36f9fa8
fix: Add S3 eventual consistency delay and increase file descriptor l…
devin-ai-integration[bot] Aug 8, 2025
158572a
fix: Use COMPUTE_WH_LARGE instead of non-existent COMPUTE_WH_2XLARGE …
devin-ai-integration[bot] Aug 8, 2025
da22d77
feat: Update to COMPUTE_WH_2XLARGE warehouse and add Snowflake CPU mi…
devin-ai-integration[bot] Aug 8, 2025
57a3376
feat: Update warehouse configuration and add CPU minutes analysis
devin-ai-integration[bot] Aug 8, 2025
d1779f3
feat: Configure COMPUTE_WH_2XLARGE warehouse for 32x performance test
devin-ai-integration[bot] Aug 8, 2025
e0f4375
feat: Switch to co-located S3 bucket in US West 2 to match Snowflake …
devin-ai-integration[bot] Aug 8, 2025
9a9a23d
fix: Use existing accessible S3 bucket ab-destiantion-iceberg-us-west-2
devin-ai-integration[bot] Aug 8, 2025
a1c1c0c
feat: Add unload_table_to_lake() method for arbitrary table unloads
devin-ai-integration[bot] Aug 8, 2025
cbbf530
refactor: Make unload_stream_to_lake() call unload_table_to_lake() to…
devin-ai-integration[bot] Aug 8, 2025
de339fd
fix: Shorten parameter descriptions to fix line length linting issues
devin-ai-integration[bot] Aug 8, 2025
9fe4829
feat: Remove arrow-based write_dataset() and read_dataset() methods f…
devin-ai-integration[bot] Aug 8, 2025
16f5226
refactor: Make unload_stream_to_lake() generic in base class, move cu…
devin-ai-integration[bot] Aug 8, 2025
0f182f3
tidy up implementation
aaronsteers Aug 8, 2025
bc415da
add FastUnloadResultObject
aaronsteers Aug 8, 2025
5471786
add ability to load from an unload result
aaronsteers Aug 8, 2025
8dff561
fix: Resolve import errors and bugs preventing fast lake copy script …
devin-ai-integration[bot] Aug 8, 2025
c0bc120
toggle on reload, expand to 100MM sample records
aaronsteers Aug 12, 2025
30664de
fix: source config was not using new constant
aaronsteers Aug 12, 2025
34d4351
remove nonsense metric
aaronsteers Aug 12, 2025
297674d
feat: Toggle RELOAD_INITIAL_SOURCE_DATA to False after 100MM dataset …
devin-ai-integration[bot] Aug 12, 2025
f6cc1ea
feat: Implement robust COPY INTO metadata capture using RESULT_SCAN()
devin-ai-integration[bot] Aug 12, 2025
6b67ed1
feat: Enhance FastUnloadResult with actual record counts from RESULT_…
devin-ai-integration[bot] Aug 12, 2025
cf50d64
fix: Add noqa comment for necessary SQLAlchemy _mapping access
devin-ai-integration[bot] Aug 12, 2025
68c5fff
feat: Add files count and manifest display to fast lake copy example …
devin-ai-integration[bot] Aug 12, 2025
597c1a5
fix: Remove unnecessary f-string prefixes from static print statements
devin-ai-integration[bot] Aug 12, 2025
53bb076
Merge branch 'aj/feat/bananza-mode-replication' of https://github.com…
aaronsteers Aug 13, 2025
14b468c
feat: Add multi-warehouse performance analysis with timestamped S3 paths
devin-ai-integration[bot] Aug 13, 2025
103ecc7
Merge branch 'aj/feat/bananza-mode-replication' of https://git-manage…
devin-ai-integration[bot] Aug 13, 2025
6799539
feat: Remove cost efficiency and scaling efficiency tables from perfo…
devin-ai-integration[bot] Aug 13, 2025
d38423a
feat: Implement FastLoadResult class with Snowflake COPY INTO metadat…
devin-ai-integration[bot] Aug 13, 2025
47aa505
fix: Update table qualification logic in Snowflake fast_unload_table …
devin-ai-integration[bot] Aug 13, 2025
836d932
feat: Add FastLoadResult validation and test scripts
devin-ai-integration[bot] Aug 13, 2025
5d8ae55
feat: Add debug logging to compare unload vs load file processing
devin-ai-integration[bot] Aug 13, 2025
435e72d
feat: Enable debug logging with smaller dataset for load timeout debu…
devin-ai-integration[bot] Aug 13, 2025
3ea679a
fix: Prepare script for 100M dataset reload after accidental deletion
devin-ai-integration[bot] Aug 13, 2025
033c1aa
feat: Add destructive operation warning for RELOAD_INITIAL_SOURCE_DAT…
devin-ai-integration[bot] Aug 13, 2025
f53cf29
tidy (wip)
aaronsteers Aug 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
from airbyte.datasets import CachedDataset
from airbyte.destinations.base import Destination
from airbyte.destinations.util import get_destination
from airbyte.lakes import GCSLakeStorage, LakeStorage, S3LakeStorage
from airbyte.records import StreamRecord
from airbyte.results import ReadResult, WriteResult
from airbyte.secrets import SecretSourceEnum, get_secret
Expand All @@ -154,6 +155,7 @@
documents,
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
experimental,
lakes,
logs,
mcp,
records,
Expand All @@ -175,6 +177,7 @@
"documents",
"exceptions",
"experimental",
"lakes",
"logs",
"mcp",
"records",
Expand All @@ -195,7 +198,10 @@
"CachedDataset",
"Destination",
"DuckDBCache",
"GCSLakeStorage",
"LakeStorage",
"ReadResult",
"S3LakeStorage",
"SecretSourceEnum",
"Source",
"StreamRecord",
Expand Down
176 changes: 176 additions & 0 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from airbyte.caches._state_backend import SqlStateBackend
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset


if TYPE_CHECKING:
from airbyte.lakes import FastUnloadResult, LakeStorage
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.shared.sql_processor import SqlConfig
from airbyte.shared.state_writers import StdOutStateWriter
Expand Down Expand Up @@ -74,6 +78,7 @@ def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 #
"configuration."
)

@final
def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the cache and backends."""
super().__init__(**data)
Expand Down Expand Up @@ -107,6 +112,7 @@ def __init__(self, **data: Any) -> None: # noqa: ANN401
temp_file_cleanup=self.cleanup,
)

@final
@property
def config_hash(self) -> str | None:
"""Return a hash of the cache configuration.
Expand All @@ -115,6 +121,7 @@ def config_hash(self) -> str | None:
"""
return super(SqlConfig, self).config_hash

@final
def execute_sql(self, sql: str | list[str]) -> None:
"""Execute one or more SQL statements against the cache's SQL backend.

Expand Down Expand Up @@ -145,6 +152,7 @@ def processor(self) -> SqlProcessorBase:
"""Return the SQL processor instance."""
return self._read_processor

@final
def get_record_processor(
self,
source_name: str,
Expand Down Expand Up @@ -178,6 +186,7 @@ def get_record_processor(

# Read methods:

@final
def get_records(
self,
stream_name: str,
Expand Down Expand Up @@ -251,6 +260,7 @@ def __bool__(self) -> bool:
"""
return True

@final
def get_state_provider(
self,
source_name: str,
Expand All @@ -266,6 +276,7 @@ def get_state_provider(
destination_name=destination_name,
)

@final
def get_state_writer(
self,
source_name: str,
Expand All @@ -281,6 +292,7 @@ def get_state_writer(
destination_name=destination_name,
)

@final
def register_source(
self,
source_name: str,
Expand All @@ -294,6 +306,7 @@ def register_source(
incoming_stream_names=stream_names,
)

@final
def create_source_tables(
self,
source: Source,
Expand Down Expand Up @@ -330,20 +343,24 @@ def create_source_tables(
create_if_missing=True,
)

@final
def __getitem__(self, stream: str) -> CachedDataset:
"""Return a dataset by stream name."""
return self.streams[stream]

@final
def __contains__(self, stream: str) -> bool:
"""Return whether a stream is in the cache."""
return stream in (self._catalog_backend.stream_names)

@final
def __iter__( # type: ignore [override] # Overriding Pydantic model method
self,
) -> Iterator[tuple[str, Any]]:
"""Iterate over the streams in the cache."""
return ((name, dataset) for name, dataset in self.streams.items())

@final
def _write_airbyte_message_stream(
self,
stdin: IO[str] | AirbyteMessageIterator,
Expand All @@ -365,3 +382,162 @@ def _write_airbyte_message_stream(
progress_tracker=progress_tracker,
)
progress_tracker.log_cache_processing_complete()

@final
def fast_unload_streams(
self,
lake_store: LakeStorage,
*,
streams: list[str] | Literal["*"] | None = None,
) -> list[FastUnloadResult]:
"""Unload the cache to a lake store.

We dump data directly to parquet files in the lake store.

Args:
streams: The streams to unload. If None, unload all streams.
lake_store: The lake store to unload to. If None, use the default lake store.
"""
stream_names: list[str]
if streams == "*" or streams is None:
stream_names = self._catalog_backend.stream_names
elif isinstance(streams, list):
stream_names = streams

return [
self.fast_unload_stream(stream_name, lake_store)
for stream_name in stream_names
]

@final
def fast_unload_stream(
self,
stream_name: str,
lake_store: LakeStorage,
**kwargs,
) -> FastUnloadResult:
"""Unload a single stream to the lake store.

This generic implementation delegates to `fast_unload_table()`
which subclasses should override for database-specific fast operations.
"""
if not hasattr(self, "fast_unload_table"):
raise NotImplementedError("Subclasses must implement `fast_unload_table()` method")

sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

hasattr check always passes – subclasses still allowed to omit an override

CacheBase itself defines fast_unload_table, so hasattr(self, "fast_unload_table") is always True.
Would you switch to comparing the underlying function object instead?

if self.fast_unload_table.__func__ is CacheBase.fast_unload_table:
    raise NotImplementedError("Subclass must override fast_unload_table()")

This guarantees the override is present – wdyt?

🤖 Prompt for AI Agents
In airbyte/caches/base.py around lines 428 to 433, the current hasattr check for
fast_unload_table always passes because CacheBase defines it, allowing
subclasses to omit overriding it. Replace the hasattr check with a comparison of
the underlying function objects by checking if self.fast_unload_table.__func__
is CacheBase.fast_unload_table, and raise NotImplementedError if they are the
same. This ensures subclasses must override the method.

return self.fast_unload_table(
stream_name=stream_name,
table_name=table_name,
lake_store=lake_store,
lake_path_prefix=stream_name,
**kwargs,
)

def fast_unload_table(
self,
table_name: str,
lake_store: LakeStorage,
*,
stream_name: str | None = None,
db_name: str | None = None,
schema_name: str | None = None,
path_prefix: str | None = None,
) -> FastUnloadResult:
"""Fast-unload a specific table to the designated lake storage.

Subclasses should override this method to implement fast unloads.
"""
raise NotImplementedError

@final
def fast_load_streams(
self,
lake_store: LakeStorage,
*,
streams: list[str],
) -> None:
"""Unload the cache to a lake store.

We dump data directly to parquet files in the lake store.

Args:
streams: The streams to unload. If None, unload all streams.
lake_store: The lake store to unload to. If None, use the default lake store.
"""
for stream_name in streams:
self.fast_load_stream(
stream_name,
lake_store,
)

@final
def fast_load_stream(
self,
stream_name: str,
lake_store: LakeStorage,
lake_path_prefix: str,
*,
zero_copy: bool = False,
) -> None:
"""Load a single stream from the lake store using fast native LOAD operations."""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

if zero_copy:
raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.")

self.fast_load_table(
table_name=table_name,
lake_store=lake_store,
lake_path_prefix=lake_path_prefix,
zero_copy=zero_copy,
)

def fast_load_table(
self,
table_name: str,
lake_store: LakeStorage,
lake_path_prefix: str,
*,
db_name: str | None = None,
schema_name: str | None = None,
) -> None:
"""Fast-unload a specific table to the designated lake storage.

Subclasses should override this method to implement fast unloads.
"""
raise NotImplementedError

@final
def fast_load_stream_from_unload_result(
self,
stream_name: str,
unload_result: FastUnloadResult,
*,
zero_copy: bool = False,
) -> None:
"""Load the result of a fast unload operation."""
self.fast_load_stream(
stream_name=stream_name,
lake_store=unload_result.lake_store,
lake_path_prefix=unload_result.lake_path_prefix,
zero_copy=zero_copy,
)

@final
def fast_load_table_from_unload_result(
self,
table_name: str,
unload_result: FastUnloadResult,
*,
zero_copy: bool = False,
) -> None:
"""Load the result of a fast unload operation."""
self.fast_load_table(
table_name=table_name,
lake_store=unload_result.lake_store,
lake_path_prefix=unload_result.lake_path_prefix,
zero_copy=zero_copy,
)
Loading