Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ def _execute(
)

try:
for line in self.executor.execute(args, stdin=stdin):
suppress_stderr = progress_tracker is not None
for line in self.executor.execute(args, stdin=stdin, suppress_stderr=suppress_stderr):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
if progress_tracker and message.record:
Expand Down
15 changes: 13 additions & 2 deletions airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def _pump_input(
try:
pipe.writelines(message.model_dump_json() + "\n" for message in messages)
pipe.flush() # Ensure data is sent immediately
except (BrokenPipeError, OSError) as ex:
if isinstance(ex, BrokenPipeError):
pass # Expected during graceful shutdown
else:
exception_holder.set_exception(ex)
except Exception as ex:
exception_holder.set_exception(ex)

Expand All @@ -62,16 +67,18 @@ def _stream_from_subprocess(
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
log_file: IO[str] | None = None,
suppress_stderr: bool = False,
) -> Generator[Iterable[str], None, None]:
"""Stream lines from a subprocess."""
input_thread: Thread | None = None
exception_holder = ExceptionHolder()
if isinstance(stdin, AirbyteMessageIterator):
stderr_target = subprocess.DEVNULL if suppress_stderr else log_file
process = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=log_file,
stderr=stderr_target,
universal_newlines=True,
encoding="utf-8",
)
Expand All @@ -95,11 +102,12 @@ def _stream_from_subprocess(

else:
# stdin is None or a file-like object
stderr_target = subprocess.DEVNULL if suppress_stderr else log_file
process = subprocess.Popen(
args,
stdin=stdin,
stdout=subprocess.PIPE,
stderr=log_file,
stderr=stderr_target,
universal_newlines=True,
encoding="utf-8",
)
Expand Down Expand Up @@ -199,15 +207,18 @@ def execute(
args: list[str],
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
suppress_stderr: bool = False,
) -> Iterator[str]:
"""Execute a command and return an iterator of STDOUT lines.

If stdin is provided, it will be passed to the subprocess as STDIN.
If suppress_stderr is True, stderr output will be suppressed to reduce noise.
"""
mapped_args = self.map_cli_args(args)
with _stream_from_subprocess(
[*self._cli, *mapped_args],
stdin=stdin,
suppress_stderr=suppress_stderr,
) as stream_lines:
yield from stream_lines

Expand Down
3 changes: 2 additions & 1 deletion airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ def execute(
args: list[str],
*,
stdin: IO[str] | AirbyteMessageIterator | None = None,
suppress_stderr: bool = False,
) -> Iterator[str]:
"""Execute the declarative source."""
_ = stdin # Not used
_ = stdin, suppress_stderr # Not used
source_entrypoint = AirbyteEntrypoint(self.declarative_source)

mapped_args: list[str] = self.map_cli_args(args)
Expand Down
1 change: 1 addition & 0 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class EventState(str, Enum):
STARTED = "started"
FAILED = "failed"
SUCCEEDED = "succeeded"
CANCELED = "canceled"


class EventType(str, Enum):
Expand Down
7 changes: 7 additions & 0 deletions airbyte/datasets/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from pandas import DataFrame

from airbyte_protocol.models.airbyte_protocol import ConfiguredAirbyteStream

from airbyte._util.document_rendering import DocumentRenderer
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE

Expand Down Expand Up @@ -75,3 +77,8 @@ def to_documents(
render_metadata=render_metadata,
)
yield from renderer.render_documents(self)

@property
def column_names(self) -> list[str]:
"""Return the list of top-level column names."""
return list(self._stream_metadata.stream.json_schema["properties"].keys())
45 changes: 45 additions & 0 deletions airbyte/datasets/_inmemory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""In-memory dataset class."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from overrides import overrides

from airbyte.datasets import DatasetBase


if TYPE_CHECKING:
from collections.abc import Iterator

from airbyte_protocol.models import ConfiguredAirbyteStream


class InMemoryDataset(DatasetBase):
"""A dataset that is held in memory.

This dataset is useful for testing and debugging purposes, but should not be used with any
large datasets.
"""

def __init__(
self,
records: list[dict[str, Any]],
stream_metadata: ConfiguredAirbyteStream,
) -> None:
"""Initialize the dataset with a list of records."""
# Should already be a list, but we convert it to a list just in case an iterator is passed.
self._records: list[dict[str, Any]] = list(records)
super().__init__(
stream_metadata=stream_metadata,
)

@overrides
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Return the iterator of records."""
return iter(self._records)

def __len__(self) -> int:
"""Return the number of records in the dataset."""
return len(self._records)
40 changes: 39 additions & 1 deletion airbyte/datasets/_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@
from overrides import overrides

from airbyte.datasets import DatasetBase
from airbyte.datasets._inmemory import InMemoryDataset


if TYPE_CHECKING:
import threading
from collections.abc import Iterator, Mapping

from airbyte_protocol.models import ConfiguredAirbyteStream

from airbyte import progress


class LazyDataset(DatasetBase):
"""A dataset that is loaded incrementally from a source or a SQL query."""

def __init__(
self,
iterator: Iterator[dict[str, Any]],
*,
stream_metadata: ConfiguredAirbyteStream,
stop_event: threading.Event | None,
progress_tracker: progress.ProgressTracker,
) -> None:
self._stop_event: threading.Event | None = stop_event or None
self._progress_tracker = progress_tracker
self._iterator: Iterator[dict[str, Any]] = iterator
super().__init__(
stream_metadata=stream_metadata,
Expand All @@ -32,4 +41,33 @@ def __iter__(self) -> Iterator[dict[str, Any]]:
return self._iterator

def __next__(self) -> Mapping[str, Any]:
return next(self._iterator)
try:
return next(self._iterator)
except StopIteration:
# The iterator is exhausted, tell the producer they can stop if they are still
# producing records. (Esp. when an artificial limit is reached.)
self._progress_tracker.log_success()
if self._stop_event:
self._stop_event.set()
raise

def fetch_all(self) -> InMemoryDataset:
"""Fetch all records to memory and return an InMemoryDataset."""
return InMemoryDataset(
records=list(self._iterator),
stream_metadata=self._stream_metadata,
)

def close(self) -> None:
"""Stop the dataset iterator.

This method is used to signal the dataset to stop fetching records, for example
when the dataset is being fetched incrementally and the user wants to stop the
fetching process.
"""
if self._stop_event:
self._stop_event.set()

def __del__(self) -> None:
"""Close the dataset when the object is deleted."""
self.close()
12 changes: 11 additions & 1 deletion airbyte/datasets/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

from airbyte_protocol.models import ConfiguredAirbyteStream

from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
from airbyte.constants import (
AB_EXTRACTED_AT_COLUMN,
AB_META_COLUMN,
AB_RAW_ID_COLUMN,
DEFAULT_ARROW_MAX_CHUNK_SIZE,
)
from airbyte.datasets._base import DatasetBase


Expand Down Expand Up @@ -135,6 +140,11 @@ def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
query_statement=filtered_select,
)

@property
def column_names(self) -> list[str]:
"""Return the list of top-level column names, including internal Airbyte columns."""
return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]


class CachedDataset(SQLDataset):
"""A dataset backed by a SQL table cache.
Expand Down
28 changes: 26 additions & 2 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,22 @@ def _log_sync_start(self) -> None:
event_type=EventType.SYNC,
)

def _log_sync_cancel(self) -> None:
print(
f"Canceled `{self.job_description}` sync at `{ab_datetime_now().strftime('%H:%M:%S')}`."
)
self._send_telemetry(
state=EventState.CANCELED,
event_type=EventType.SYNC,
)

def _log_stream_read_start(self, stream_name: str) -> None:
print(
f"Read started on stream `{stream_name}` at "
f"`{ab_datetime_now().strftime('%H:%M:%S')}`..."
)
self.stream_read_start_times[stream_name] = time.time()

def log_stream_start(self, stream_name: str) -> None:
"""Log that a stream has started reading."""
if stream_name not in self.stream_read_start_times:
Expand Down Expand Up @@ -538,9 +554,17 @@ def log_success(

self._update_display(force_refresh=True)
self._stop_rich_view()
self._print_info_message(
streams = list(self.stream_read_start_times.keys())
if not streams:
streams_str = ""
elif len(streams) == 1:
streams_str = f" (`{streams[0]}` stream)"
else:
streams_str = f" ({len(streams)} streams)"

print(
f"Completed `{self.job_description}` sync at "
f"`{ab_datetime_now().strftime('%H:%M:%S')}`."
f"`{ab_datetime_now().strftime('%H:%M:%S')}`{streams_str}."
)
self._log_read_metrics()
self._send_telemetry(
Expand Down
Loading