Skip to content

Chore: New example sync to snowflake Cortex #350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def _pump_input(
try:
pipe.writelines(message.model_dump_json() + "\n" for message in messages)
pipe.flush() # Ensure data is sent immediately
except BrokenPipeError:
# If the pipe is broken, ignore the exception
# The subprocess will handle the error
exception_holder.set_exception(exc.AirbyteConnectorBrokenPipeError())
except Exception as ex:
exception_holder.set_exception(ex)

Expand Down
11 changes: 11 additions & 0 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,17 @@ def _get_log_file(self) -> Path | None:
return None


class AirbyteConnectorBrokenPipeError(AirbyteConnectorError, BrokenPipeError):
"""Broken pipe error occurred.

This indicates that an upstream source failed and closed the pipe.
"""

guidance = (
"Please check the upstream source logs for more information on the cause of the failure."
)


class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
"""Connector executable not found."""

Expand Down
92 changes: 92 additions & 0 deletions examples/run_snowflake_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""
Usage:
poetry install
poetry run python examples/run_snowflake_destination.py
"""

from __future__ import annotations

import airbyte as ab
from airbyte.caches import SnowflakeCache
from airbyte.secrets.google_gsm import GoogleGSMSecretManager

SCALE = 100


AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"
secret_mgr = GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"),
)

secret = secret_mgr.get_secret(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
assert secret is not None, "Secret not found."
secret_config = secret.parse_json()

snowflake_destination_secret = secret_mgr.fetch_connector_secret(
connector_name="destination-snowflake",
).parse_json()
cortex_destination_secret = secret_mgr.fetch_connector_secret(
connector_name="destination-snowflake-cortex",
).parse_json()


source = ab.get_source(
"source-faker",
config={
"count": SCALE,
},
install_if_missing=True,
streams=["products", "users"],
)
cache = SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
)
snowflake_destination = ab.get_destination(
"destination-snowflake",
config={
**snowflake_destination_secret,
"default_schema": "pyairbyte_tests",
},
)
cortex_destination_secret["processing"]["text_fields"] = [
"make",
"model",
"name",
"gender",
]
cortex_destination_secret["indexing"]["default_schema"] = "pyairbyte_tests"
cortex_destination = ab.get_destination(
"destination-snowflake-cortex",
config=cortex_destination_secret,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider focusing on one destination type?

The setup for both Snowflake and Cortex destinations is comprehensive. For a demo script, though, we might want to keep things as simple as possible. What do you think about focusing on just one destination type? Maybe we could have separate example scripts for Snowflake and Cortex? This could make each example more focused and easier to follow. Wdyt?

If you agree, we could either:

  1. Keep just the Snowflake destination and remove the Cortex-specific code.
  2. Keep just the Cortex destination and remove the regular Snowflake destination code.
  3. Create two separate example scripts, one for each destination type.

# cortex_destination.print_config_spec()
# snowflake_destination.print_config_spec()
# cortex_destination.print_config_spec()
snowflake_destination.check()
cortex_destination.check()
source.check()

# # This works:
# snowflake_write_result = snowflake_destination.write(
# source,
# cache=False, # Toggle comment to test with/without caching
# )

cortex_write_result = cortex_destination.write(
source,
cache=False, # Toggle comment to test with/without caching
)

# result = source.read(cache)

# for name in ["products"]:
# print(f"Stream {name}: {len(list(result[name]))} records")