diff --git a/airbyte/_executors/__init__.py b/airbyte/_executors/__init__.py index 96fccc98..528f005a 100644 --- a/airbyte/_executors/__init__.py +++ b/airbyte/_executors/__init__.py @@ -1,2 +1,19 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. """Support for connector executors. This is currently a non-public API.""" + +from airbyte._executors.base import Executor +from airbyte._executors.declarative import DeclarativeExecutor +from airbyte._executors.docker import DockerExecutor +from airbyte._executors.java import JavaExecutor +from airbyte._executors.local import PathExecutor +from airbyte._executors.python import VenvExecutor + + +__all__ = [ + "Executor", + "DeclarativeExecutor", + "DockerExecutor", + "JavaExecutor", + "PathExecutor", + "VenvExecutor", +] diff --git a/airbyte/_executors/java.py b/airbyte/_executors/java.py new file mode 100644 index 00000000..5cd8ad13 --- /dev/null +++ b/airbyte/_executors/java.py @@ -0,0 +1,442 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""Java connector executor with automatic JRE management. + +This module provides the JavaExecutor class for running Java-based Airbyte connectors. +It automatically downloads and manages Zulu JRE installations in ~/.airbyte/java directory +and handles connector tar file extraction and execution. + +Fallback Logic: +- When use_java_tar=False: Java execution is explicitly disabled, fallback to Docker +- When use_java_tar=None and Docker available: Use Docker as the more stable option +- When use_java_tar=None and Docker unavailable: Use Java with automatic JRE download +- When use_java_tar is truthy: Use Java executor with specified or auto-detected tar file +""" + +from __future__ import annotations + +import os +import platform +import shutil +import subprocess +import sys +import tarfile +from pathlib import Path +from typing import TYPE_CHECKING + +import requests +from overrides import overrides +from rich import print # noqa: A004 # Allow shadowing the built-in + +from airbyte import exceptions as exc +from airbyte._executors.base import Executor +from airbyte._util.telemetry import EventState, log_install_state +from airbyte.version import get_version + + +if TYPE_CHECKING: + from collections.abc import Iterator + from typing import IO + + from airbyte.sources.registry import ConnectorMetadata + +from airbyte._message_iterators import AirbyteMessageIterator + + +class JavaExecutor(Executor): + """An executor for Java connectors that manages JRE installation and execution.""" + + def __init__( + self, + name: str | None = None, + *, + metadata: ConnectorMetadata | None = None, + target_version: str | None = None, + use_java: Path | str | bool | None = None, + use_java_tar: Path | str | None = None, + ) -> None: + """Initialize a Java connector executor. + + Args: + name: The name of the connector. + metadata: (Optional.) The metadata of the connector. + target_version: (Optional.) The version of the connector to install. + use_java: (Optional.) Java execution mode: True/False to enable/disable, + Path/str for custom JRE location, None for auto-detection. + use_java_tar: (Optional.) Path to connector tar file. If provided, + implies use_java=True unless use_java is explicitly False. + """ + super().__init__(name=name, metadata=metadata, target_version=target_version) + + if use_java_tar is not None and use_java is None: + use_java = True + + self.use_java = use_java + self.connector_tar_path = Path(use_java_tar) if use_java_tar else None + self.java_version = "21" + self.airbyte_home = Path.home() / ".airbyte" + self.java_cache_dir = self.airbyte_home / "java" + + self.os_name, self.arch = self._detect_platform() + self.jre_dir = self.java_cache_dir / f"{self.os_name}-{self.arch}" + + self.connector_dir = self.airbyte_home / "connectors" / self.name + + if self.connector_tar_path is None and self.use_java: + self.connector_tar_path = self._auto_detect_tar_path() + + def _auto_detect_tar_path(self) -> Path | None: + """Auto-detect the connector tar file path.""" + return None + + def _detect_platform(self) -> tuple[str, str]: + """Detect the current OS and architecture. + + Returns: + Tuple of (os_name, architecture) normalized for Azul API. + """ + system = platform.system().lower() + if system == "darwin": + os_name = "macos" + elif system == "linux": + os_name = "linux" + else: + raise exc.AirbyteConnectorInstallationError( + message=f"Unsupported operating system: {system}", + connector_name=self.name, + ) + + machine = platform.machine().lower() + if machine in {"arm64", "aarch64"}: + arch = "aarch64" + elif machine in {"x86_64", "amd64"}: + arch = "x64" + else: + raise exc.AirbyteConnectorInstallationError( + message=f"Unsupported architecture: {machine}", + connector_name=self.name, + ) + + return os_name, arch + + def _get_java_executable(self) -> Path: + """Get the path to the Java executable. + + Returns: + Path to the java executable. + """ + java_home_env = os.environ.get("JAVA_HOME") + if java_home_env: + java_home = Path(java_home_env) + java_executable = java_home / "bin" / "java" + if java_executable.exists(): + print(f"✅ Using JAVA from JAVA_HOME: {java_home}") + return java_executable + + java_executable = self.jre_dir / "bin" / "java" + if java_executable.exists(): + print(f"✅ Using cached JRE: {self.jre_dir}") + return java_executable + + print("🌐 JAVA not found — downloading portable JRE...") + self._download_jre() + return java_executable + + def _download_jre(self) -> None: + """Download and extract JRE from Azul API.""" + print(f"âŦ‡ī¸ Downloading Zulu JRE {self.java_version} for {self.os_name}/{self.arch}...") + + self.jre_dir.mkdir(parents=True, exist_ok=True) + + azul_api_url = ( + f"https://api.azul.com/metadata/v1/zulu/packages/" + f"?java_version={self.java_version}&os={self.os_name}&arch={self.arch}" + f"&java_package_type=jre&release_status=ga&availability_types=CA" + ) + + try: + response = requests.get( + azul_api_url, + headers={"User-Agent": f"PyAirbyte/{get_version()}"}, + timeout=30, + ) + response.raise_for_status() + azul_data = response.json() + except requests.RequestException as ex: + raise exc.AirbyteConnectorInstallationError( + message="Failed to query Azul API for JRE download.", + connector_name=self.name, + context={ + "azul_api_url": azul_api_url, + "os": self.os_name, + "arch": self.arch, + }, + ) from ex + + jre_url = None + for package in azul_data: + download_url = package.get("download_url", "") + if download_url.endswith(".tar.gz"): + jre_url = download_url + break + + if not jre_url: + raise exc.AirbyteConnectorInstallationError( + message="Failed to find JRE download URL from Azul API.", + connector_name=self.name, + context={ + "azul_api_url": azul_api_url, + "os": self.os_name, + "arch": self.arch, + "available_packages": len(azul_data), + }, + ) + + print(f"🔗 JRE download URL: {jre_url}") + self._download_and_extract_jre(jre_url) + + def _download_and_extract_jre(self, jre_url: str) -> None: + """Download and extract JRE from the given URL.""" + try: + response = requests.get(jre_url, stream=True, timeout=300) + response.raise_for_status() + + with tarfile.open(fileobj=response.raw, mode="r|gz") as tar: + self._extract_jre_with_strip_components(tar) + + print(f"✅ JRE downloaded and extracted to: {self.jre_dir}") + + except (requests.RequestException, tarfile.TarError) as ex: + raise exc.AirbyteConnectorInstallationError( + message="Failed to download or extract JRE.", + connector_name=self.name, + context={ + "jre_url": jre_url, + "jre_dir": str(self.jre_dir), + }, + ) from ex + + def _extract_jre_with_strip_components(self, tar: tarfile.TarFile) -> None: + """Extract JRE tar file while stripping the top-level directory. + + This method implements the equivalent of `tar --strip-components=1` behavior. + JRE tar files from Azul typically contain a root directory like + 'zulu21.28.85-ca-jre21.0.0-linux_x64/' that wraps all the actual JRE files. + We want to extract the JRE contents directly to self.jre_dir without + creating this wrapper directory. + + Args: + tar: The opened tarfile.TarFile object to extract from. + """ + root_dir = None + + for member in tar: + if root_dir is None: + root_dir = member.name.split("/")[0] + + if member.name.startswith(root_dir + "/"): + stripped_name = member.name[len(root_dir) + 1 :] + if not stripped_name: # Skip empty names (root directory itself) + continue + + target_path = self.jre_dir / stripped_name + + if member.isdir(): + target_path.mkdir(parents=True, exist_ok=True) + elif member.isfile(): + target_path.parent.mkdir(parents=True, exist_ok=True) + source = tar.extractfile(member) + if source: + with source: + target_path.write_bytes(source.read()) + target_path.chmod(member.mode) + elif member.issym(): + target_path.parent.mkdir(parents=True, exist_ok=True) + target_path.symlink_to(member.linkname) + elif member.name == root_dir: + continue + + def _extract_connector_tar(self) -> None: + """Extract the connector tar file to the connector directory.""" + if not self.connector_tar_path or not self.connector_tar_path.exists(): + raise exc.AirbyteConnectorInstallationError( + message="Connector tar file not found.", + connector_name=self.name, + context={ + "connector_tar_path": str(self.connector_tar_path), + }, + ) + + self.connector_dir.mkdir(parents=True, exist_ok=True) + print(f"đŸ“Ļ Extracting connector tar: {self.connector_tar_path!s} to {self.connector_dir!s}") + + try: + with tarfile.open(self.connector_tar_path, "r") as tar: + tar.extractall(self.connector_dir) + + print(f"✅ Connector extracted to: {self.connector_dir}") + + except tarfile.TarError as ex: + raise exc.AirbyteConnectorInstallationError( + message="Failed to extract connector tar file.", + connector_name=self.name, + context={ + "connector_tar_path": str(self.connector_tar_path), + "connector_dir": str(self.connector_dir), + }, + ) from ex + + def _get_connector_executable(self) -> Path: + """Get the path to the connector executable.""" + app_dir = self.connector_dir / "airbyte-app" + if app_dir.exists(): + bin_dir = app_dir / "bin" + if bin_dir.exists(): + executable = bin_dir / self.name + if executable.exists(): + return executable + + for bin_dir in self.connector_dir.rglob("bin"): + for executable in bin_dir.iterdir(): + if ( + executable.is_file() and executable.stat().st_mode & 0o111 + ): # Check if executable + return executable + + raise exc.AirbyteConnectorInstallationError( + message="Could not find connector executable in extracted tar.", + connector_name=self.name, + context={ + "connector_dir": str(self.connector_dir), + "expected_path": str(self.connector_dir / "airbyte-app" / "bin" / self.name), + }, + ) + + @overrides + def install(self) -> None: + """Install the Java connector by extracting tar and ensuring JRE is available.""" + try: + self._extract_connector_tar() + + _ = self._get_java_executable() + + log_install_state(self.name, state=EventState.SUCCEEDED) + print(f"✅ Java connector '{self.name}' installed successfully!", file=sys.stderr) + + except Exception as ex: + log_install_state(self.name, state=EventState.FAILED, exception=ex) + raise + + @overrides + def uninstall(self) -> None: + """Uninstall the Java connector by removing the connector directory.""" + if self.connector_dir.exists(): + shutil.rmtree(self.connector_dir) + print(f"đŸ—‘ī¸ Removed connector directory: {self.connector_dir}") + + @overrides + def ensure_installation(self, *, auto_fix: bool = True) -> None: + """Ensure that the Java connector is installed and JRE is available.""" + if not self.connector_dir.exists(): + if auto_fix: + self.install() + else: + raise exc.AirbyteConnectorInstallationError( + message="Connector directory does not exist.", + connector_name=self.name, + context={"connector_dir": str(self.connector_dir)}, + ) + else: + try: + self._get_connector_executable() + self._get_java_executable() + except Exception as ex: + if auto_fix: + self.install() + else: + raise exc.AirbyteConnectorInstallationError( + message="Connector or JRE not properly installed.", + connector_name=self.name, + ) from ex + + @property + def _cli(self) -> list[str]: + """Get the base args of the CLI executable.""" + connector_executable = self._get_connector_executable() + return [str(connector_executable)] + + @overrides + def execute( + self, + args: list[str], + *, + stdin: IO[str] | AirbyteMessageIterator | None = None, + suppress_stderr: bool = False, + ) -> Iterator[str]: + """Execute the Java connector with proper environment setup.""" + java_executable = self._get_java_executable() + java_home = java_executable.parent.parent + + env = os.environ.copy() + env["JAVA_HOME"] = str(java_home) + env["PATH"] = f"{java_home / 'bin'}:{env.get('PATH', '')}" + + connector_executable = self._get_connector_executable() + + mapped_args = [] + for arg in args: + if arg in {"spec", "check", "discover", "read"}: + mapped_args.append(f"--{arg}") + else: + mapped_args.append(arg) + + cmd = [str(connector_executable), *mapped_args] + + print( + f"🚀 Running Java connector with 'JAVA_HOME={java_home!s}': '{' '.join(cmd)}'", + file=sys.stderr, + ) + + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE if isinstance(stdin, AirbyteMessageIterator) else stdin, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE if not suppress_stderr else subprocess.DEVNULL, + universal_newlines=True, + encoding="utf-8", + env=env, + ) + + if isinstance(stdin, AirbyteMessageIterator): + try: + for message in stdin: + if process.stdin: + process.stdin.write(message.model_dump_json() + "\n") + process.stdin.flush() + if process.stdin: + process.stdin.close() + except BrokenPipeError: + pass + + if process.stdout: + try: + while True: + line = process.stdout.readline() + if not line: + break + yield line.rstrip("\n\r") + finally: + process.stdout.close() + + exit_code = process.wait() + + if exit_code not in {0, -15}: + stderr_output = "" + if process.stderr: + stderr_output = process.stderr.read() + process.stderr.close() + + raise exc.AirbyteSubprocessFailedError( + run_args=cmd, + exit_code=exit_code, + log_text=stderr_output, + ) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index e33a9c05..928833b1 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -2,7 +2,6 @@ from __future__ import annotations import hashlib -import sys import tempfile from pathlib import Path from typing import TYPE_CHECKING, Literal, cast @@ -14,9 +13,10 @@ from airbyte import exceptions as exc from airbyte._executors.declarative import DeclarativeExecutor from airbyte._executors.docker import DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR, DockerExecutor +from airbyte._executors.java import JavaExecutor from airbyte._executors.local import PathExecutor from airbyte._executors.python import VenvExecutor -from airbyte._util.meta import which +from airbyte._util.meta import is_docker_installed, which from airbyte._util.telemetry import EventState, log_install_state # Non-public API from airbyte.constants import AIRBYTE_OFFLINE_MODE, TEMP_DIR_OVERRIDE from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata @@ -148,7 +148,7 @@ def _get_local_executor( # `local_executable` is now a Path object - print(f"Using local `{name}` executable: {local_executable!s}", file=sys.stderr) + print(f"Using local `{name}` executable: {local_executable!s}") return PathExecutor( name=name, path=local_executable, @@ -167,27 +167,30 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # install_if_missing: bool = True, install_root: Path | None = None, use_python: bool | Path | str | None = None, + use_java: Path | str | bool | None = None, + use_java_tar: Path | str | None = None, ) -> Executor: """This factory function creates an executor for a connector. + For Java connectors (InstallType.JAVA), the fallback logic is: + - If use_java is False: Use Docker (explicitly disabled Java) + - If use_java is None and use_java_tar is None and Docker is available: Use Docker + - If use_java is None and use_java_tar is None and Docker is not available: Use Java + - If use_java is truthy or use_java_tar is provided: Use Java executor + - use_java_tar being provided implies use_java=True unless use_java is explicitly False + For documentation of each arg, see the function `airbyte.sources.util.get_source()`. """ install_method_count = sum( [ bool(local_executable), bool(docker_image), - bool(pip_url) or bool(use_python), + bool(pip_url), bool(source_manifest), + bool(use_java) or bool(use_java_tar), ] ) - if use_python is False: - docker_image = True - - if use_python is None and pip_url is not None: - # If pip_url is set, we assume the user wants to use Python. - use_python = True - if version and pip_url: raise exc.PyAirbyteInputError( message=( @@ -204,13 +207,15 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # raise exc.PyAirbyteInputError( message=( "You can only specify one of the settings: 'local_executable', 'docker_image', " - "'source_manifest', or 'pip_url'." + "'source_manifest', 'pip_url', 'use_java', or 'use_java_tar'." ), context={ "local_executable": local_executable, "docker_image": docker_image, "pip_url": pip_url, "source_manifest": source_manifest, + "use_java": use_java, + "use_java_tar": use_java_tar, }, ) metadata: ConnectorMetadata | None = None @@ -249,8 +254,26 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # case InstallType.YAML: source_manifest = True case InstallType.PYTHON: - pip_url = metadata.pypi_package_name - pip_url = f"{pip_url}=={version}" if version else pip_url + if use_python is False: + docker_image = True + else: + pip_url = metadata.pypi_package_name + pip_url = f"{pip_url}=={version}" if version else pip_url + case InstallType.JAVA: + effective_use_java = use_java + if use_java_tar is not None and use_java is None: + effective_use_java = True + + if effective_use_java is False: + docker_image = True + elif effective_use_java is None: + if is_docker_installed(): + docker_image = True + else: + use_java = True + else: + # use_java is truthy or use_java_tar is provided, use Java + pass # Will be handled by the Java executor block later case _: docker_image = True @@ -340,23 +363,34 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # components_py_checksum=components_py_checksum, ) + if use_java or use_java_tar: + executor = JavaExecutor( + name=name, + metadata=metadata, + target_version=version, + use_java=use_java, + use_java_tar=use_java_tar, + ) + if install_if_missing: + executor.ensure_installation() + return executor + # else: we are installing a connector in a Python virtual environment: try: - executor = VenvExecutor( + venv_executor = VenvExecutor( name=name, metadata=metadata, target_version=version, pip_url=pip_url, install_root=install_root, - use_python=use_python, ) if install_if_missing: - executor.ensure_installation() + venv_executor.ensure_installation() except Exception as e: log_install_state(name, state=EventState.FAILED, exception=e) raise else: # No exceptions were raised, so return the executor. - return executor + return venv_executor diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index 2fe5bf8e..49b04fef 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -31,6 +31,8 @@ def get_destination( # noqa: PLR0913 # Too many arguments use_host_network: bool = False, install_if_missing: bool = True, install_root: Path | None = None, + use_java: Path | str | bool | None = None, + use_java_tar: Path | str | None = None, ) -> Destination: """Get a connector by name and version. @@ -69,6 +71,12 @@ def get_destination( # noqa: PLR0913 # Too many arguments parameter is ignored when local_executable is set. install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used. + use_java: (Optional.) Java execution mode: `True` to enable Java execution with automatic + JRE management, `False` to explicitly disable Java execution, `Path` or `str` to specify + a custom JRE location, or `None` for auto-detection based on connector type. + use_java_tar: (Optional.) Path to Java connector tar file. If provided, implies + `use_java=True` unless `use_java` is explicitly set to `False`. Use `None` for + auto-detection of connector tar files. """ return Destination( name=name, @@ -84,6 +92,8 @@ def get_destination( # noqa: PLR0913 # Too many arguments use_host_network=use_host_network, install_if_missing=install_if_missing, install_root=install_root, + use_java=use_java, + use_java_tar=use_java_tar, ), ) diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index f2c52c11..97c5f476 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -59,6 +59,8 @@ def get_source( # noqa: PLR0913 # Too many arguments source_manifest: bool | dict | Path | str | None = None, install_if_missing: bool = True, install_root: Path | None = None, + use_java: Path | str | bool | None = None, + use_java_tar: Path | str | None = None, ) -> Source: """Get a connector by name and version. @@ -111,6 +113,12 @@ def get_source( # noqa: PLR0913 # Too many arguments parameter is ignored when `local_executable` or `source_manifest` are set. install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used. + use_java: (Optional.) Java execution mode: `True` to enable Java execution with automatic + JRE management, `False` to explicitly disable Java execution, `Path` or `str` to specify + a custom JRE location, or `None` for auto-detection based on connector type. + use_java_tar: (Optional.) Path to Java connector tar file. If provided, implies + `use_java=True` unless `use_java` is explicitly set to `False`. Use `None` for + auto-detection of connector tar files. """ return Source( name=name, @@ -128,6 +136,8 @@ def get_source( # noqa: PLR0913 # Too many arguments source_manifest=source_manifest, install_if_missing=install_if_missing, install_root=install_root, + use_java=use_java, + use_java_tar=use_java_tar, ), ) diff --git a/examples/run_java_connector_tests.py b/examples/run_java_connector_tests.py new file mode 100755 index 00000000..99e8fc70 --- /dev/null +++ b/examples/run_java_connector_tests.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""Example script demonstrating Java connector support with source-snowflake. + +Usage: + poetry run examples/run_java_connector_tests.py + +Requirements: + - DEVIN_GCP_SERVICE_ACCOUNT_JSON environment variable set + - GSM secrets configured for source-snowflake +""" + +from __future__ import annotations + +import os +import re +import tempfile +from pathlib import Path + +import airbyte as ab +import requests +from airbyte.secrets.google_gsm import GoogleGSMSecretManager, GSMSecretHandle + + +def unset_java_home() -> None: + """Unset JAVA_HOME environment variable to avoid conflicts with auto-downloaded JRE.""" + if "JAVA_HOME" in os.environ: + del os.environ["JAVA_HOME"] + print("✅ Unset JAVA_HOME to avoid conflicts with auto-downloaded JRE.") + else: + print("â„šī¸ JAVA_HOME was not set, no need to unset it.") + + +def download_snowflake_tar() -> Path: + """Download source-snowflake tar file from Google Drive.""" + file_id = "1S0yMrdhs2TLu5u1yvj-52kaeagRAG9ZR" + + # Create session and get initial response + session = requests.Session() + response = session.get(f"https://drive.google.com/uc?export=download&id={file_id}") + + if "virus scan warning" in response.text.lower(): + uuid_match = re.search(r'name="uuid" value="([^"]+)"', response.text) + uuid_value = uuid_match.group(1) if uuid_match else "" + + response = session.get( + f"https://drive.usercontent.google.com/download?id={file_id}&export=download&confirm=t&uuid={uuid_value}" + ) + + fd, temp_path = tempfile.mkstemp(suffix=".tar") + temp_file = Path(temp_path) + os.close(fd) + temp_file.write_bytes(response.content) + return temp_file + + +def get_connector_config( + connector_name: str, + secret_name: str | None = None, +) -> dict[str, str]: + """Retrieve the connector configuration.""" + secret_mgr = GoogleGSMSecretManager( + project="dataline-integration-testing", + credentials_json=os.environ.get("DEVIN_GCP_SERVICE_ACCOUNT_JSON"), + ) + if secret_name is None: + secret: GSMSecretHandle = secret_mgr.fetch_connector_secret(connector_name) + return secret.parse_json() + + secret: GSMSecretHandle = secret_mgr.get_secret_handle(secret_name) + return secret.parse_json() + + +def main() -> None: + """Main function demonstrating Java connector usage.""" + print("🚀 PyAirbyte Java Connector Demo - source-snowflake") + print("=" * 60) + + print("🔧 Unsetting JAVA_HOME to avoid conflicts with auto-downloaded JRE...") + unset_java_home() + + print("đŸ“Ĩ Downloading source-snowflake tar file...") + tar_path = download_snowflake_tar() + print(f"✅ Downloaded tar to: {tar_path}") + + config = get_connector_config( + "source-snowflake", + # Default OAuth creds don't work for some reason. So we use a custom secret: + secret_name="SECRET_SOURCE-SNOWFLAKE__CREDS", + ) + print(f"✅ Retrieved config for account: {config.get('account', 'N/A')}") + + # Create source with Java execution using downloaded tar + source = ab.get_source( + "source-snowflake", + config=config, + use_java_tar=tar_path, + # use_java=True, # Implied by use_java_tar + ) + print("✅ Source created successfully!") + + _ = source.config_spec + print("✅ Config spec retrieved successfully!") + + source.check() + print("✅ Connection check passed") + stream_names = source.get_available_streams() + print(f"📊 Found {len(stream_names)} streams") + + selected_stream = stream_names[0] + source.select_streams([selected_stream]) + read_result = source.read() + records = list(read_result[selected_stream])[:10] + print(f"✅ Read {len(records)} records using Java connector!") + + +if __name__ == "__main__": + main()