-
Notifications
You must be signed in to change notification settings - Fork 67
do not merge: bananza mode wip #744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
a663037
68e1df6
d5e6713
14457bc
54069f2
504eb39
9a69509
8db85a6
65d1628
386f254
4d70666
36f9fa8
158572a
da22d77
57a3376
d1779f3
e0f4375
9a9a23d
a1c1c0c
cbbf530
de339fd
9fe4829
16f5226
0f182f3
bc415da
5471786
8dff561
c0bc120
30664de
34d4351
297674d
f6cc1ea
6b67ed1
cf50d64
68c5fff
597c1a5
53bb076
14b468c
103ecc7
6799539
d38423a
47aa505
836d932
5d8ae55
435e72d
3ea679a
033c1aa
f53cf29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,7 @@ | |
|
||
from __future__ import annotations | ||
|
||
from typing import ClassVar | ||
from typing import TYPE_CHECKING, ClassVar | ||
|
||
from airbyte_api.models import DestinationSnowflake | ||
|
||
|
@@ -68,6 +68,11 @@ | |
from airbyte.destinations._translate_cache_to_dest import ( | ||
snowflake_cache_to_destination_configuration, | ||
) | ||
|
||
|
||
if TYPE_CHECKING: | ||
from airbyte.lakes import LakeStorage | ||
from airbyte.secrets.util import get_secret | ||
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase | ||
|
||
|
||
|
@@ -86,6 +91,92 @@ def paired_destination_config(self) -> DestinationSnowflake: | |
"""Return a dictionary of destination configuration values.""" | ||
return snowflake_cache_to_destination_configuration(cache=self) | ||
|
||
def unload_stream_to_lake( | ||
self, | ||
stream_name: str, | ||
lake_store: LakeStorage, | ||
) -> None: | ||
"""Unload a single stream to the lake store using Snowflake COPY INTO. | ||
|
||
This implementation uses Snowflake's COPY INTO command to unload data | ||
directly to S3 in Parquet format with managed artifacts for optimal performance. | ||
|
||
Args: | ||
stream_name: The name of the stream to unload. | ||
lake_store: The lake store to unload to. | ||
""" | ||
sql_table = self.streams[stream_name].to_sql_table() | ||
table_name = sql_table.name | ||
aws_access_key_id = get_secret("AWS_ACCESS_KEY_ID") | ||
aws_secret_access_key = get_secret("AWS_SECRET_ACCESS_KEY") | ||
|
||
artifact_prefix = lake_store.get_artifact_prefix() | ||
file_format_name = f"{artifact_prefix}PARQUET_FORMAT" | ||
create_format_sql = f""" | ||
CREATE FILE FORMAT IF NOT EXISTS {file_format_name} | ||
TYPE = PARQUET | ||
COMPRESSION = SNAPPY | ||
""" | ||
self.execute_sql(create_format_sql) | ||
|
||
stage_name = f"{artifact_prefix}STAGE" | ||
create_stage_sql = f""" | ||
CREATE STAGE IF NOT EXISTS {stage_name} | ||
URL = '{lake_store.root_storage_uri}' | ||
CREDENTIALS = ( | ||
AWS_KEY_ID = '{aws_access_key_id}' | ||
AWS_SECRET_KEY = '{aws_secret_access_key}' | ||
) | ||
FILE_FORMAT = {file_format_name} | ||
""" | ||
self.execute_sql(create_stage_sql) | ||
|
||
unload_statement = f""" | ||
COPY INTO @{stage_name}/{stream_name}/ | ||
FROM {self._read_processor.sql_config.schema_name}.{table_name} | ||
FILE_FORMAT = {file_format_name} | ||
OVERWRITE = TRUE | ||
""" | ||
self.execute_sql(unload_statement) | ||
|
||
|
||
def load_stream_from_lake( | ||
self, | ||
stream_name: str, | ||
lake_store: LakeStorage, | ||
*, | ||
zero_copy: bool = False, | ||
) -> None: | ||
"""Load a single stream from the lake store using Snowflake COPY INTO. | ||
|
||
This implementation uses Snowflake's COPY INTO command to load data | ||
directly from S3 in Parquet format with managed artifacts for optimal performance. | ||
|
||
Args: | ||
stream_name: The name of the stream to load. | ||
lake_store: The lake store to load from. | ||
zero_copy: Whether to use zero-copy loading. If True, the data will be | ||
loaded without copying it to the cache. This is useful for large datasets | ||
that don't need to be stored in the cache. | ||
""" | ||
sql_table = self.streams[stream_name].to_sql_table() | ||
table_name = sql_table.name | ||
|
||
if zero_copy: | ||
raise NotImplementedError("Zero-copy loading is not yet supported in Snowflake.") | ||
|
||
artifact_prefix = lake_store.get_artifact_prefix() | ||
file_format_name = f"{artifact_prefix}PARQUET_FORMAT" | ||
stage_name = f"{artifact_prefix}STAGE" | ||
|
||
load_statement = f""" | ||
COPY INTO {self._read_processor.sql_config.schema_name}.{table_name} | ||
FROM @{stage_name}/{stream_name}/ | ||
FILE_FORMAT = {file_format_name} | ||
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE | ||
PURGE = FALSE | ||
""" | ||
self.execute_sql(load_statement) | ||
|
||
|
||
# Expose the Cache class and also the Config class. | ||
__all__ = [ | ||
|
Uh oh!
There was an error while loading. Please reload this page.