Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a663037
save bananza mode wip
aaronsteers Jul 31, 2025
68e1df6
feat: Complete bananza mode lake storage implementation
devin-ai-integration[bot] Aug 8, 2025
d5e6713
feat: Add warehouse sizing variables and scaling analysis
devin-ai-integration[bot] Aug 8, 2025
14457bc
fix: Resolve linting and type checking issues
devin-ai-integration[bot] Aug 8, 2025
54069f2
feat: Complete fast lake copy implementation with warehouse scaling
devin-ai-integration[bot] Aug 8, 2025
504eb39
feat: Add comprehensive timestamps and elapsed time to fast lake copy…
devin-ai-integration[bot] Aug 8, 2025
9a69509
feat: Scale faker data to 10 million rows and process only purchases …
devin-ai-integration[bot] Aug 8, 2025
8db85a6
fix: Add force_full_refresh=True to ensure all 10M records are processed
devin-ai-integration[bot] Aug 8, 2025
65d1628
feat: Add detailed performance metrics with records/s and MB/s for ea…
devin-ai-integration[bot] Aug 8, 2025
386f254
feat: Scale to 50 million records and add write_strategy=replace para…
devin-ai-integration[bot] Aug 8, 2025
4d70666
feat: Configure second run with 2XLARGE warehouse and skip initial da…
devin-ai-integration[bot] Aug 8, 2025
36f9fa8
fix: Add S3 eventual consistency delay and increase file descriptor l…
devin-ai-integration[bot] Aug 8, 2025
158572a
fix: Use COMPUTE_WH_LARGE instead of non-existent COMPUTE_WH_2XLARGE …
devin-ai-integration[bot] Aug 8, 2025
da22d77
feat: Update to COMPUTE_WH_2XLARGE warehouse and add Snowflake CPU mi…
devin-ai-integration[bot] Aug 8, 2025
57a3376
feat: Update warehouse configuration and add CPU minutes analysis
devin-ai-integration[bot] Aug 8, 2025
d1779f3
feat: Configure COMPUTE_WH_2XLARGE warehouse for 32x performance test
devin-ai-integration[bot] Aug 8, 2025
e0f4375
feat: Switch to co-located S3 bucket in US West 2 to match Snowflake …
devin-ai-integration[bot] Aug 8, 2025
9a9a23d
fix: Use existing accessible S3 bucket ab-destiantion-iceberg-us-west-2
devin-ai-integration[bot] Aug 8, 2025
a1c1c0c
feat: Add unload_table_to_lake() method for arbitrary table unloads
devin-ai-integration[bot] Aug 8, 2025
cbbf530
refactor: Make unload_stream_to_lake() call unload_table_to_lake() to…
devin-ai-integration[bot] Aug 8, 2025
de339fd
fix: Shorten parameter descriptions to fix line length linting issues
devin-ai-integration[bot] Aug 8, 2025
9fe4829
feat: Remove arrow-based write_dataset() and read_dataset() methods f…
devin-ai-integration[bot] Aug 8, 2025
16f5226
refactor: Make unload_stream_to_lake() generic in base class, move cu…
devin-ai-integration[bot] Aug 8, 2025
0f182f3
tidy up implementation
aaronsteers Aug 8, 2025
bc415da
add FastUnloadResultObject
aaronsteers Aug 8, 2025
5471786
add ability to load from an unload result
aaronsteers Aug 8, 2025
8dff561
fix: Resolve import errors and bugs preventing fast lake copy script …
devin-ai-integration[bot] Aug 8, 2025
c0bc120
toggle on reload, expand to 100MM sample records
aaronsteers Aug 12, 2025
30664de
fix: source config was not using new constant
aaronsteers Aug 12, 2025
34d4351
remove nonsense metric
aaronsteers Aug 12, 2025
297674d
feat: Toggle RELOAD_INITIAL_SOURCE_DATA to False after 100MM dataset …
devin-ai-integration[bot] Aug 12, 2025
f6cc1ea
feat: Implement robust COPY INTO metadata capture using RESULT_SCAN()
devin-ai-integration[bot] Aug 12, 2025
6b67ed1
feat: Enhance FastUnloadResult with actual record counts from RESULT_…
devin-ai-integration[bot] Aug 12, 2025
cf50d64
fix: Add noqa comment for necessary SQLAlchemy _mapping access
devin-ai-integration[bot] Aug 12, 2025
68c5fff
feat: Add files count and manifest display to fast lake copy example …
devin-ai-integration[bot] Aug 12, 2025
597c1a5
fix: Remove unnecessary f-string prefixes from static print statements
devin-ai-integration[bot] Aug 12, 2025
53bb076
Merge branch 'aj/feat/bananza-mode-replication' of https://github.com…
aaronsteers Aug 13, 2025
14b468c
feat: Add multi-warehouse performance analysis with timestamped S3 paths
devin-ai-integration[bot] Aug 13, 2025
103ecc7
Merge branch 'aj/feat/bananza-mode-replication' of https://git-manage…
devin-ai-integration[bot] Aug 13, 2025
6799539
feat: Remove cost efficiency and scaling efficiency tables from perfo…
devin-ai-integration[bot] Aug 13, 2025
d38423a
feat: Implement FastLoadResult class with Snowflake COPY INTO metadat…
devin-ai-integration[bot] Aug 13, 2025
47aa505
fix: Update table qualification logic in Snowflake fast_unload_table …
devin-ai-integration[bot] Aug 13, 2025
836d932
feat: Add FastLoadResult validation and test scripts
devin-ai-integration[bot] Aug 13, 2025
5d8ae55
feat: Add debug logging to compare unload vs load file processing
devin-ai-integration[bot] Aug 13, 2025
435e72d
feat: Enable debug logging with smaller dataset for load timeout debu…
devin-ai-integration[bot] Aug 13, 2025
3ea679a
fix: Prepare script for 100M dataset reload after accidental deletion
devin-ai-integration[bot] Aug 13, 2025
033c1aa
feat: Add destructive operation warning for RELOAD_INITIAL_SOURCE_DAT…
devin-ai-integration[bot] Aug 13, 2025
f53cf29
tidy (wip)
aaronsteers Aug 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
from airbyte.datasets import CachedDataset
from airbyte.destinations.base import Destination
from airbyte.destinations.util import get_destination
from airbyte.lakes import GCSLakeStorage, LakeStorage, S3LakeStorage
from airbyte.records import StreamRecord
from airbyte.results import ReadResult, WriteResult
from airbyte.secrets import SecretSourceEnum, get_secret
Expand All @@ -154,6 +155,7 @@
documents,
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
experimental,
lakes,
logs,
mcp,
records,
Expand All @@ -175,6 +177,7 @@
"documents",
"exceptions",
"experimental",
"lakes",
"logs",
"mcp",
"records",
Expand All @@ -195,7 +198,10 @@
"CachedDataset",
"Destination",
"DuckDBCache",
"GCSLakeStorage",
"LakeStorage",
"ReadResult",
"S3LakeStorage",
"SecretSourceEnum",
"Source",
"StreamRecord",
Expand Down
74 changes: 74 additions & 0 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from airbyte.caches._state_backend import SqlStateBackend
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset


if TYPE_CHECKING:
from airbyte.lakes import LakeStorage
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.shared.sql_processor import SqlConfig
from airbyte.shared.state_writers import StdOutStateWriter
Expand Down Expand Up @@ -365,3 +369,73 @@ def _write_airbyte_message_stream(
progress_tracker=progress_tracker,
)
progress_tracker.log_cache_processing_complete()

def fast_unload(
self,
lake_store: LakeStorage,
*,
streams: list[str] | Literal["*"] | None = None,
) -> None:
"""Unload the cache to a lake store.

We dump data directly to parquet files in the lake store.

Args:
streams: The streams to unload. If None, unload all streams.
lake_store: The lake store to unload to. If None, use the default lake store.
"""
stream_names: list[str]
if streams == "*" or streams is None:
stream_names = self._catalog_backend.stream_names
elif isinstance(streams, list):
stream_names = streams

for stream_name in stream_names:
self._unload_stream_to_lake_store(
stream_name,
lake_store,
)

def _unload_stream_to_lake_store(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Unload a single stream to the lake store.

This generic implementation delegates to the `lake_store` and passes
an Arrow dataset to the lake store object.

Subclasses can override this method to provide a faster
unload implementation.
"""
arrow_dataset = self.get_arrow_dataset(stream_name)
lake_store.write_dataset(
dataset=arrow_dataset,
table_name=stream_name,
schema=self.schema_name,
cache_dir=self.cache_dir,
cleanup=self.cleanup,
)

def _load_stream_from_lake_store(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Load a single stream from the lake store.

This generic implementation reads an Arrow dataset from the lake store
and writes it to the cache.

Subclasses can override this method to provide a faster
load implementation.
"""
_ = lake_store.read_dataset(
table_name=stream_name,
schema=self.schema_name,
cache_dir=self.cache_dir,
cleanup=self.cleanup,
)
# self.processor.write_arrow_dataset(arrow_dataset, stream_name)
raise NotImplementedError("Loading from lake store to cache is not yet implemented")
63 changes: 63 additions & 0 deletions airbyte/caches/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
)


if TYPE_CHECKING:
from airbyte.lakes import LakeStorage


if TYPE_CHECKING:
from airbyte.shared.sql_processor import SqlProcessorBase

Expand Down Expand Up @@ -63,6 +67,65 @@ def get_arrow_dataset(
"Please consider using a different cache implementation for these functionalities."
)

def unload_stream_to_lake(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Unload a single stream to the lake store using BigQuery EXPORT DATA.

This implementation uses BigQuery's native EXPORT DATA functionality
to write directly to GCS, bypassing the Arrow dataset limitation.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

if not hasattr(lake_store, "bucket_name"):
raise NotImplementedError("BigQuery unload currently only supports GCS lake storage")

export_uri = f"{lake_store.get_stream_root_uri(stream_name)}*.parquet"

export_statement = f"""
EXPORT DATA OPTIONS(
uri='{export_uri}',
format='PARQUET',
overwrite=true
) AS
SELECT * FROM {self._read_processor.sql_config.schema_name}.{table_name}
"""

self.execute_sql(export_statement)

def load_stream_from_lake(
self,
stream_name: str,
lake_store: LakeStorage,
*,
zero_copy: bool = False, # noqa: ARG002
) -> None:
"""Load a single stream from the lake store using BigQuery LOAD DATA.

This implementation uses BigQuery's native LOAD DATA functionality
to read directly from GCS, bypassing the Arrow dataset limitation.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

if not hasattr(lake_store, "bucket_name"):
raise NotImplementedError("BigQuery load currently only supports GCS lake storage")

source_uri = f"{lake_store.get_stream_root_uri(stream_name)}*.parquet"

load_statement = f"""
LOAD DATA INTO {self._read_processor.sql_config.schema_name}.{table_name}
FROM FILES (
format = 'PARQUET',
uris = ['{source_uri}']
)
"""

self.execute_sql(load_statement)


# Expose the Cache class and also the Config class.
__all__ = [
Expand Down
127 changes: 126 additions & 1 deletion airbyte/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

from __future__ import annotations

from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar

from airbyte_api.models import DestinationSnowflake

Expand All @@ -68,6 +68,11 @@
from airbyte.destinations._translate_cache_to_dest import (
snowflake_cache_to_destination_configuration,
)


if TYPE_CHECKING:
from airbyte.lakes import LakeStorage
from airbyte.secrets.util import get_secret
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase


Expand All @@ -86,6 +91,126 @@ def paired_destination_config(self) -> DestinationSnowflake:
"""Return a dictionary of destination configuration values."""
return snowflake_cache_to_destination_configuration(cache=self)

def unload_stream_to_lake(
self,
stream_name: str,
lake_store: LakeStorage,
*,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
) -> None:
"""Unload a single stream to the lake store using Snowflake COPY INTO.

This implementation uses Snowflake's COPY INTO command to unload data
directly to S3 in Parquet format with managed artifacts for optimal performance.

Args:
stream_name: The name of the stream to unload.
lake_store: The lake store to unload to.
aws_access_key_id: AWS access key ID. If not provided, will try to get from secrets.
aws_secret_access_key: AWS secret access key. If not provided, will try to get from secrets.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

if aws_access_key_id is None:
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID")
if aws_secret_access_key is None:
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY")

artifact_prefix = lake_store.get_artifact_prefix()
file_format_name = f"{artifact_prefix}PARQUET_FORMAT"
create_format_sql = f"""
CREATE FILE FORMAT IF NOT EXISTS {file_format_name}
TYPE = PARQUET
COMPRESSION = SNAPPY
"""
self.execute_sql(create_format_sql)

stage_name = f"{artifact_prefix}STAGE"
create_stage_sql = f"""
CREATE OR REPLACE STAGE {stage_name}
URL = '{lake_store.root_storage_uri}'
CREDENTIALS = (
AWS_KEY_ID = '{aws_access_key_id}'
AWS_SECRET_KEY = '{aws_secret_access_key}'
)
FILE_FORMAT = {file_format_name}
"""
self.execute_sql(create_stage_sql)

unload_statement = f"""
COPY INTO @{stage_name}/{stream_name}/
FROM {self._read_processor.sql_config.schema_name}.{table_name}
FILE_FORMAT = {file_format_name}
OVERWRITE = TRUE
"""
self.execute_sql(unload_statement)

def load_stream_from_lake(
self,
stream_name: str,
lake_store: LakeStorage,
*,
zero_copy: bool = False,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
) -> None:
"""Load a single stream from the lake store using Snowflake COPY INTO.

This implementation uses Snowflake's COPY INTO command to load data
directly from S3 in Parquet format with managed artifacts for optimal performance.

Args:
stream_name: The name of the stream to load.
lake_store: The lake store to load from.
zero_copy: Whether to use zero-copy loading. If True, the data will be
loaded without copying it to the cache. This is useful for large datasets
that don't need to be stored in the cache.
aws_access_key_id: AWS access key ID. If not provided, will try to get from secrets.
aws_secret_access_key: AWS secret access key. If not provided, will try to get from secrets.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name

if zero_copy:
raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.")

if aws_access_key_id is None:
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID")
if aws_secret_access_key is None:
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY")

artifact_prefix = lake_store.get_artifact_prefix()
file_format_name = f"{artifact_prefix}PARQUET_FORMAT"
create_format_sql = f"""
CREATE FILE FORMAT IF NOT EXISTS {file_format_name}
TYPE = PARQUET
COMPRESSION = SNAPPY
"""
self.execute_sql(create_format_sql)

stage_name = f"{artifact_prefix}STAGE"
create_stage_sql = f"""
CREATE OR REPLACE STAGE {stage_name}
URL = '{lake_store.root_storage_uri}'
CREDENTIALS = (
AWS_KEY_ID = '{aws_access_key_id}'
AWS_SECRET_KEY = '{aws_secret_access_key}'
)
FILE_FORMAT = {file_format_name}
"""
self.execute_sql(create_stage_sql)

load_statement = f"""
COPY INTO {self._read_processor.sql_config.schema_name}.{table_name}
FROM @{stage_name}/{stream_name}/
FILE_FORMAT = {file_format_name}
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
PURGE = FALSE
"""
self.execute_sql(load_statement)


# Expose the Cache class and also the Config class.
__all__ = [
Expand Down
Loading
Loading