Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import abc
import json
import sys
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, Literal
Expand Down Expand Up @@ -106,7 +107,7 @@ def _print_error_message(
message: str,
) -> None:
"""Print a message to the console and the logger."""
rich.print(f"ERROR: {message}")
rich.print(f"ERROR: {message}", file=sys.stderr)
if self._file_logger:
self._file_logger.error(message)

Expand Down Expand Up @@ -248,14 +249,26 @@ def print_config_spec(
format: Literal["yaml", "json"] = "yaml", # noqa: A002
*,
output_file: Path | str | None = None,
stderr: bool = False,
) -> None:
"""Print the configuration spec for this connector.

Args:
format: The format to print the spec in. Must be "yaml" or "json".
output_file: Optional. If set, the spec will be written to the given file path.
Otherwise, it will be printed to the console.
stderr: If True, print to stderr instead of stdout. This is useful when we
want to print the spec to the console but not interfere with other output.
"""
if output_file and stderr:
raise exc.PyAirbyteInputError(
message="You can set output_file or stderr but not both.",
context={
"output_file": output_file,
"stderr": stderr,
},
)

if format not in {"yaml", "json"}:
raise exc.PyAirbyteInputError(
message="Invalid format. Expected 'yaml' or 'json'",
Expand All @@ -274,7 +287,7 @@ def print_config_spec(
return

syntax_highlighted = Syntax(content, format)
rich.print(syntax_highlighted)
rich.print(syntax_highlighted, file=sys.stderr if stderr else None)

@property
def _yaml_spec(self) -> str:
Expand Down Expand Up @@ -325,7 +338,10 @@ def check(self) -> None:
for msg in self._execute(["check", "--config", config_file]):
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
rich.print(f"Connection check succeeded for `{self.name}`.")
rich.print(
f"Connection check succeeded for `{self.name}`.",
file=sys.stderr,
)
log_connector_check_result(
name=self.name,
state=EventState.SUCCEEDED,
Expand Down Expand Up @@ -359,7 +375,10 @@ def check(self) -> None:
def install(self) -> None:
"""Install the connector if it is not yet installed."""
self.executor.install()
rich.print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
rich.print(
f"For configuration instructions, see: \n{self.docs_url}#reference\n",
file=sys.stderr,
)

def uninstall(self) -> None:
"""Uninstall the connector if it is installed.
Expand Down
9 changes: 6 additions & 3 deletions airbyte/_executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def install(self) -> None:
pip_path = str(get_bin_dir(self._get_venv_path()) / "pip")
print(
f"Installing '{self.name}' into virtual environment '{self._get_venv_path()!s}'.\n"
f"Running 'pip install {self.pip_url}'...\n"
f"Running 'pip install {self.pip_url}'...\n",
file=sys.stderr,
)
try:
self._run_subprocess_and_raise_on_failure(
Expand All @@ -134,7 +135,8 @@ def install(self) -> None:
print(
f"Connector '{self.name}' installed successfully!\n"
f"For more information, see the {self.name} documentation:\n"
f"{self.docs_url}#reference\n"
f"{self.docs_url}#reference\n",
file=sys.stderr,
)

@overrides
Expand Down Expand Up @@ -241,7 +243,8 @@ def ensure_installation(
# This is sometimes caused by a failed or partial installation.
print(
"Connector executable not found within the virtual environment "
f"at {self._get_connector_path()!s}.\nReinstalling..."
f"at {self._get_connector_path()!s}.\nReinstalling...",
file=sys.stderr,
)
self.uninstall()
self.install()
Expand Down
3 changes: 2 additions & 1 deletion airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import hashlib
import sys
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Literal, cast
Expand Down Expand Up @@ -147,7 +148,7 @@ def _get_local_executor(

# `local_executable` is now a Path object

print(f"Using local `{name}` executable: {local_executable!s}")
print(f"Using local `{name}` executable: {local_executable!s}", file=sys.stderr)
return PathExecutor(
name=name,
path=local_executable,
Expand Down
7 changes: 4 additions & 3 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

from __future__ import annotations

import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -339,8 +340,8 @@ def validate(
pip_url=pip_url,
)

print("Getting `spec` output from connector...")
connector_obj.print_config_spec()
print("Getting `spec` output from connector...", file=sys.stderr)
connector_obj.print_config_spec(stderr=True)

if config:
print("Running connector check...")
Expand Down Expand Up @@ -435,7 +436,7 @@ def benchmark(
else get_noop_destination()
)

click.echo("Running benchmarks...")
click.echo("Running benchmarks...", sys.stderr)
destination_obj.write(
source_data=source_obj,
cache=False,
Expand Down
7 changes: 4 additions & 3 deletions airbyte/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import os
import platform
import sys
import tempfile
import warnings
from functools import lru_cache
Expand Down Expand Up @@ -153,7 +154,7 @@ def get_global_file_logger() -> logging.Logger | None:
return None

logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
print(f"Writing PyAirbyte logs to file: {logfile_path!s}")
print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr)

file_handler = logging.FileHandler(
filename=logfile_path,
Expand Down Expand Up @@ -242,7 +243,7 @@ def get_global_stats_logger() -> structlog.BoundLogger:
# No temp directory available, so return no-op logger without handlers
return structlog.get_logger("airbyte.stats")

print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}")
print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr)

# Remove any existing handlers
for handler in logger.handlers:
Expand Down Expand Up @@ -299,7 +300,7 @@ def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
global_logger = get_global_file_logger()
logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
print(logfile_msg)
print(logfile_msg, file=sys.stderr)
if global_logger:
global_logger.info(logfile_msg)

Expand Down
6 changes: 6 additions & 0 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from enum import Enum, auto
from typing import IO, TYPE_CHECKING, Any, Literal, cast

from rich.console import Console
from rich.errors import LiveError
from rich.live import Live as RichLive
from rich.markdown import Markdown as RichMarkdown
Expand Down Expand Up @@ -211,6 +212,7 @@ def __init__(

# Progress bar properties
self._last_update_time: float | None = None
self._stderr_console: Console | None = None
self._rich_view: RichLive | None = None

self.reset_progress_style(style)
Expand Down Expand Up @@ -622,9 +624,13 @@ def _start_rich_view(self) -> None:
"""
if self.style == ProgressStyle.RICH and not self._rich_view:
try:
if self._stderr_console is None:
self._stderr_console = Console(stderr=True)

self._rich_view = RichLive(
auto_refresh=True,
refresh_per_second=DEFAULT_REFRESHES_PER_SECOND,
console=self._stderr_console,
)
self._rich_view.start()
except Exception:
Expand Down
47 changes: 7 additions & 40 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

from __future__ import annotations

import json
import sys
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

import yaml
from rich import print # noqa: A004 # Allow shadowing the built-in
from rich.syntax import Syntax

from airbyte_protocol.models import (
AirbyteCatalog,
Expand Down Expand Up @@ -49,7 +47,7 @@
from airbyte.shared.state_writers import StateWriterBase


class Source(ConnectorBase): # noqa: PLR0904
class Source(ConnectorBase):
"""A class representing a source that can be called."""

connector_type = "source"
Expand Down Expand Up @@ -202,12 +200,14 @@ def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
"""Logs a warning message indicating stream selection which are not selected yet."""
if streams == "*":
print(
"Warning: Config is not set yet. All streams will be selected after config is set."
"Warning: Config is not set yet. All streams will be selected after config is set.",
file=sys.stderr,
)
else:
print(
"Warning: Config is not set yet. "
f"Streams to be selected after config is set: {streams}"
f"Streams to be selected after config is set: {streams}",
file=sys.stderr,
)

def select_all_streams(self) -> None:
Expand Down Expand Up @@ -347,39 +347,6 @@ def config_spec(self) -> dict[str, Any]:
"""
return self._get_spec(force_refresh=True).connectionSpecification

def print_config_spec(
self,
format: Literal["yaml", "json"] = "yaml", # noqa: A002
*,
output_file: Path | str | None = None,
) -> None:
"""Print the configuration spec for this connector.

Args:
format: The format to print the spec in. Must be "yaml" or "json".
output_file: Optional. If set, the spec will be written to the given file path.
Otherwise, it will be printed to the console.
"""
if format not in {"yaml", "json"}:
raise exc.PyAirbyteInputError(
message="Invalid format. Expected 'yaml' or 'json'",
input_value=format,
)
if isinstance(output_file, str):
output_file = Path(output_file)

if format == "yaml":
content = yaml.dump(self.config_spec, indent=2)
elif format == "json":
content = json.dumps(self.config_spec, indent=2)

if output_file:
output_file.write_text(content)
return

syntax_highlighted = Syntax(content, format)
print(syntax_highlighted)

@property
def _yaml_spec(self) -> str:
"""Get the spec as a yaml string.
Expand Down Expand Up @@ -690,7 +657,7 @@ def _log_incremental_streams(
f"{incremental_streams}\n"
"To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
)
print(log_message)
print(log_message, file=sys.stderr)

def read(
self,
Expand Down