Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a663037
save bananza mode wip
aaronsteers Jul 31, 2025
68e1df6
feat: Complete bananza mode lake storage implementation
devin-ai-integration[bot] Aug 8, 2025
d5e6713
feat: Add warehouse sizing variables and scaling analysis
devin-ai-integration[bot] Aug 8, 2025
14457bc
fix: Resolve linting and type checking issues
devin-ai-integration[bot] Aug 8, 2025
54069f2
feat: Complete fast lake copy implementation with warehouse scaling
devin-ai-integration[bot] Aug 8, 2025
504eb39
feat: Add comprehensive timestamps and elapsed time to fast lake copy…
devin-ai-integration[bot] Aug 8, 2025
9a69509
feat: Scale faker data to 10 million rows and process only purchases …
devin-ai-integration[bot] Aug 8, 2025
8db85a6
fix: Add force_full_refresh=True to ensure all 10M records are processed
devin-ai-integration[bot] Aug 8, 2025
65d1628
feat: Add detailed performance metrics with records/s and MB/s for ea…
devin-ai-integration[bot] Aug 8, 2025
386f254
feat: Scale to 50 million records and add write_strategy=replace para…
devin-ai-integration[bot] Aug 8, 2025
4d70666
feat: Configure second run with 2XLARGE warehouse and skip initial da…
devin-ai-integration[bot] Aug 8, 2025
36f9fa8
fix: Add S3 eventual consistency delay and increase file descriptor l…
devin-ai-integration[bot] Aug 8, 2025
158572a
fix: Use COMPUTE_WH_LARGE instead of non-existent COMPUTE_WH_2XLARGE …
devin-ai-integration[bot] Aug 8, 2025
da22d77
feat: Update to COMPUTE_WH_2XLARGE warehouse and add Snowflake CPU mi…
devin-ai-integration[bot] Aug 8, 2025
57a3376
feat: Update warehouse configuration and add CPU minutes analysis
devin-ai-integration[bot] Aug 8, 2025
d1779f3
feat: Configure COMPUTE_WH_2XLARGE warehouse for 32x performance test
devin-ai-integration[bot] Aug 8, 2025
e0f4375
feat: Switch to co-located S3 bucket in US West 2 to match Snowflake …
devin-ai-integration[bot] Aug 8, 2025
9a9a23d
fix: Use existing accessible S3 bucket ab-destiantion-iceberg-us-west-2
devin-ai-integration[bot] Aug 8, 2025
a1c1c0c
feat: Add unload_table_to_lake() method for arbitrary table unloads
devin-ai-integration[bot] Aug 8, 2025
cbbf530
refactor: Make unload_stream_to_lake() call unload_table_to_lake() to…
devin-ai-integration[bot] Aug 8, 2025
de339fd
fix: Shorten parameter descriptions to fix line length linting issues
devin-ai-integration[bot] Aug 8, 2025
9fe4829
feat: Remove arrow-based write_dataset() and read_dataset() methods f…
devin-ai-integration[bot] Aug 8, 2025
16f5226
refactor: Make unload_stream_to_lake() generic in base class, move cu…
devin-ai-integration[bot] Aug 8, 2025
0f182f3
tidy up implementation
aaronsteers Aug 8, 2025
bc415da
add FastUnloadResultObject
aaronsteers Aug 8, 2025
5471786
add ability to load from an unload result
aaronsteers Aug 8, 2025
8dff561
fix: Resolve import errors and bugs preventing fast lake copy script …
devin-ai-integration[bot] Aug 8, 2025
c0bc120
toggle on reload, expand to 100MM sample records
aaronsteers Aug 12, 2025
30664de
fix: source config was not using new constant
aaronsteers Aug 12, 2025
34d4351
remove nonsense metric
aaronsteers Aug 12, 2025
297674d
feat: Toggle RELOAD_INITIAL_SOURCE_DATA to False after 100MM dataset …
devin-ai-integration[bot] Aug 12, 2025
f6cc1ea
feat: Implement robust COPY INTO metadata capture using RESULT_SCAN()
devin-ai-integration[bot] Aug 12, 2025
6b67ed1
feat: Enhance FastUnloadResult with actual record counts from RESULT_…
devin-ai-integration[bot] Aug 12, 2025
cf50d64
fix: Add noqa comment for necessary SQLAlchemy _mapping access
devin-ai-integration[bot] Aug 12, 2025
68c5fff
feat: Add files count and manifest display to fast lake copy example …
devin-ai-integration[bot] Aug 12, 2025
597c1a5
fix: Remove unnecessary f-string prefixes from static print statements
devin-ai-integration[bot] Aug 12, 2025
53bb076
Merge branch 'aj/feat/bananza-mode-replication' of https://github.com…
aaronsteers Aug 13, 2025
14b468c
feat: Add multi-warehouse performance analysis with timestamped S3 paths
devin-ai-integration[bot] Aug 13, 2025
103ecc7
Merge branch 'aj/feat/bananza-mode-replication' of https://git-manage…
devin-ai-integration[bot] Aug 13, 2025
6799539
feat: Remove cost efficiency and scaling efficiency tables from perfo…
devin-ai-integration[bot] Aug 13, 2025
d38423a
feat: Implement FastLoadResult class with Snowflake COPY INTO metadat…
devin-ai-integration[bot] Aug 13, 2025
47aa505
fix: Update table qualification logic in Snowflake fast_unload_table …
devin-ai-integration[bot] Aug 13, 2025
836d932
feat: Add FastLoadResult validation and test scripts
devin-ai-integration[bot] Aug 13, 2025
5d8ae55
feat: Add debug logging to compare unload vs load file processing
devin-ai-integration[bot] Aug 13, 2025
435e72d
feat: Enable debug logging with smaller dataset for load timeout debu…
devin-ai-integration[bot] Aug 13, 2025
3ea679a
fix: Prepare script for 100M dataset reload after accidental deletion
devin-ai-integration[bot] Aug 13, 2025
033c1aa
feat: Add destructive operation warning for RELOAD_INITIAL_SOURCE_DAT…
devin-ai-integration[bot] Aug 13, 2025
f53cf29
tidy (wip)
aaronsteers Aug 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airbyte.caches._state_backend import SqlStateBackend
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset
from airbyte.lakes import LakeStorage
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.shared.sql_processor import SqlConfig
from airbyte.shared.state_writers import StdOutStateWriter
Expand Down Expand Up @@ -365,3 +366,72 @@ def _write_airbyte_message_stream(
progress_tracker=progress_tracker,
)
progress_tracker.log_cache_processing_complete()

def fast_unload(
self,
lake_store: LakeStorage,
*,
streams: list[str] | Literal["*"] | None = None,
) -> None:
"""Unload the cache to a lake store.

We dump data directly to parquet files in the lake store.

Args:
streams: The streams to unload. If None, unload all streams.
lake_store: The lake store to unload to. If None, use the default lake store.
"""
stream_names: list[str]
if streams == "*" or streams is None:
stream_names = self._catalog_backend.stream_names
elif isinstance(streams, list):
stream_names = streams

for stream_name in stream_names:
self._unload_stream_to_lake_store(
stream_name,
lake_store,
)

def _unload_stream_to_lake_store(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Unload a single stream to the lake store.

This generic implementation delegates to the `lake_store` and passes
an Arrow dataset to the lake store object.

Subclasses can override this method to provide a faster
unload implementation.
"""
arrow_dataset = self.get_arrow_dataset(stream_name)
lake_store.write_dataset(
dataset=arrow_dataset,
table_name=stream_name,
schema=self.schema_name,
cache_dir=self.cache_dir,
cleanup=self.cleanup,
)

def _load_stream_from_lake_store(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Load a single stream from the lake store.

This generic implementation reads an Arrow dataset from the lake store
and writes it to the cache.

Subclasses can override this method to provide a faster
load implementation.
"""
arrow_dataset = lake_store.read_dataset(
table_name=stream_name,
schema=self.schema_name,
cache_dir=self.cache_dir,
cleanup=self.cleanup,
)
self.processor.write_arrow_dataset(arrow_dataset, stream_name)
68 changes: 68 additions & 0 deletions airbyte/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
from airbyte.destinations._translate_cache_to_dest import (
snowflake_cache_to_destination_configuration,
)
from airbyte.lakes import LakeStorage
from airbyte.secrets.util import get_secret
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase


Expand All @@ -86,6 +88,72 @@ def paired_destination_config(self) -> DestinationSnowflake:
"""Return a dictionary of destination configuration values."""
return snowflake_cache_to_destination_configuration(cache=self)

def unload_stream_to_lake(
self,
stream_name: str,
lake_store: LakeStorage,
) -> None:
"""Unload a single stream to the lake store.

This generic implementation delegates to the `lake_store` and passes
an Arrow dataset to the lake store object.

Subclasses can override this method to provide a faster
unload implementation.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID")
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY")
unload_statement = "\n".join([
f"COPY INTO '{lake_store.get_stream_root_uri(stream_name)}'",
f"FROM {table_name}",
"CREDENTIALS=(",
f" AWS_KEY_ID='{aws_access_key_id}'",
f" AWS_SECRET_KEY='{aws_secret_access_key}'",
")",
"FILE_FORMAT = (TYPE = 'PARQUET')",
"OVERWRITE = TRUE",
])
self.execute_sql(unload_statement)

# To get the manifest data:
# self.query_sql("RESULT_SCAN(LAST_QUERY_ID())")

def load_stream_from_lake(
self,
stream_name: str,
lake_store: LakeStorage,
*,
zero_copy: bool = False,
) -> None:
"""Load a single stream from the lake store.

This generic implementation delegates to the `lake_store` and passes
an Arrow dataset to the lake store object.

Subclasses can override this method to provide a faster
unload implementation.
"""
sql_table = self.streams[stream_name].to_sql_table()
table_name = sql_table.name
aws_access_key_id = get_secret(AWS_ACCESS_KEY_ID)
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY")
if zero_copy:
# Zero-copy loading is not yet supported in Snowflake.
raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.")

load_statement = "\n".join([
f"COPY INTO {table_name}",
f"FROM '{lake_store.get_stream_root_uri(stream_name)}'",
"CREDENTIALS=(",
f" AWS_KEY_ID='{aws_access_key_id}'",
f" AWS_SECRET_KEY='{aws_secret_access_key}'",
")",
"FILE_FORMAT = (TYPE = 'PARQUET')",
])
self.execute_sql(load_statement)


# Expose the Cache class and also the Config class.
__all__ = [
Expand Down
67 changes: 67 additions & 0 deletions airbyte/lakes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""PyAirbyte LakeStorage class."""
from __future__ import annotations

import abc
from abc import abstractproperty
Copy link

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'abstractproperty' decorator is deprecated since Python 3.3. Use '@Property' combined with '@abc.abstractmethod' instead for better compatibility and following current best practices.

Copilot uses AI. Check for mistakes.



class LakeStorage(abc.ABC):
"""PyAirbyte LakeStorage class."""

@abstractproperty
Copy link

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace '@abstractproperty' with '@Property' and '@abc.abstractmethod' decorators. The correct pattern is: '@Property\[email protected]'.

Copilot uses AI. Check for mistakes.

def uri_protocol(self) -> str:
"""Return the URI protocol for the lake storage.
E.g. "file://", "s3://", "gcs://", etc.
"""
raise NotImplementedError("Subclasses must implement this method.")

@property
def root_storage_uri(self) -> str:
"""Get the root URI for the lake storage."""
return f"{self.uri_protocol}{self.root_storage_path}/"

@property
def root_storage_path(self) -> str:
"""Get the root path for the lake storage."""
return "airbyte/lake"

def path_to_uri(self, path: str) -> str:
"""Convert a relative lake path to a URI."""
return f"{self.root_storage_uri}{path}"

def get_stream_root_path(
self,
stream_name: str,
) -> str:
"""Get the path for a stream in the lake storage."""
return f"{self.root_storage_path}/{stream_name}/"

def get_stream_root_uri(
self,
stream_name: str,
) -> str:
"""Get the URI root for a stream in the lake storage."""
return self.path_to_uri(self.get_stream_root_path(stream_name))


class S3LakeStorage(LakeStorage):
"""S3 Lake Storage implementation."""

def __init__(self, bucket_name: str, region: str, access_key_id: str, secret_access_key: str):
"""Initialize S3LakeStorage with required parameters."""
self.bucket_name = bucket_name
self.region = region
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key

@property
def uri_protocol(self) -> str:
"""Return the URI protocol for S3."""
return "s3://"

@property
def root_storage_uri(self) -> str:
"""Get the root URI for the S3 lake storage."""
return f"{self.uri_protocol}{self.bucket_name}/"
82 changes: 82 additions & 0 deletions examples/run_fast_lake_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""An example script to run a fast lake copy operation using PyAirbyte.
Usage:
poetry run python examples/run_fast_lake_copy.py
Required secrets:
- SNOWFLAKE_PASSWORD: Password for Snowflake connection.
- AWS_ACCESS_KEY_ID: AWS access key ID for S3 connection.
- AWS_SECRET_ACCESS_KEY: AWS secret access key for S3 connection.
"""
from numpy import source
Copy link

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is incorrect and will cause a runtime error. The variable 'source' is not exported by numpy. This line should be removed as it appears to be an accidental addition.

Suggested change
from numpy import source

Copilot uses AI. Check for mistakes.


import airbyte as ab
from airbyte.caches.snowflake import SnowflakeCache
from airbyte.lakes import S3LakeStorage
from airbyte.secrets.google_gsm import GoogleGSMSecretManager


AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"
secret_mgr = GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"),
)

secret = secret_mgr.get_secret(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
assert secret is not None, "Secret not found."
secret_config = secret.parse_json()

source = ab.get_source(
"source-faker",
config={
"count": 1000,
"seed": 0,
"parallelism": 1,
"always_updated": False,
},
install_if_missing=True,
streams=["products"],
)

snowflake_cache_a = SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
schema_name="test_fast_copy_source",
)
snowflake_cache_b = SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
schema_name="test_fast_copy_dest",
)

s3_lake = S3LakeStorage(
bucket_name="mybucket",
region="us-west-2",
access_key_id=ab.get_secret("AWS_ACCESS_KEY_ID"),
secret_access_key=ab.get_secret("AWS_SECRET_ACCESS_KEY"),
)

# Begin processing
source.read(cache=snowflake_cache_a)

snowflake_cache_a.unload_stream_to_lake(
stream_name="products",
lake_store=s3_lake,
)

snowflake_cache_b.load_stream_from_lake(
stream_name="products",
lake_store=s3_lake,
zero_copy=True, # Set to True for zero-copy loading if supported.
)
Loading