Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class ProgressStyle(Enum):
MAX_UPDATE_FREQUENCY = 5_000
"""The max number of records to read before updating the progress bar."""

TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS = 10
"""Threshold for time_to_first_record above which adjusted metrics are calculated."""


def _to_time_str(timestamp: float) -> str:
"""Convert a timestamp float to a local time string.
Expand Down Expand Up @@ -191,6 +194,7 @@ def __init__(
# Stream reads
self.stream_read_counts: dict[str, int] = defaultdict(int)
self.stream_read_start_times: dict[str, float] = {}
self.stream_first_record_times: dict[str, float] = {}
self.stream_read_end_times: dict[str, float] = {}
self.stream_bytes_read: dict[str, int] = defaultdict(int)

Expand Down Expand Up @@ -278,6 +282,9 @@ def tally_records_read(
if message.record.stream not in self.stream_read_start_times:
self.log_stream_start(stream_name=message.record.stream)

if message.record.stream not in self.stream_first_record_times:
self.stream_first_record_times[message.record.stream] = time.time()

elif message.trace and message.trace.stream_status:
if message.trace.stream_status.status is AirbyteStreamStatus.STARTED:
self.log_stream_start(
Expand Down Expand Up @@ -470,9 +477,36 @@ def _job_info(self) -> dict[str, Any]:

return job_info

def _calculate_adjusted_metrics(
self,
stream_name: str,
count: int,
time_to_first_record: float,
mb_read: float,
) -> dict[str, float]:
"""Calculate adjusted performance metrics when time_to_first_record exceeds threshold."""
adjusted_metrics = {}
if (
time_to_first_record > TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS
and stream_name in self.stream_first_record_times
and stream_name in self.stream_read_end_times
):
adjusted_duration = (
self.stream_read_end_times[stream_name]
- self.stream_first_record_times[stream_name]
)
if adjusted_duration > 0:
adjusted_metrics["records_per_second_adjusted"] = round(
count / adjusted_duration, 4
)
if self.bytes_tracking_enabled:
adjusted_metrics["mb_per_second_adjusted"] = round(
mb_read / adjusted_duration, 4
)
return adjusted_metrics

def _log_read_metrics(self) -> None:
"""Log read performance metrics."""
# Source performance metrics
if not self.total_records_read or not self._file_logger:
return

Expand Down Expand Up @@ -502,6 +536,18 @@ def _log_read_metrics(self) -> None:
"read_start_time": self.stream_read_start_times.get(stream_name),
"read_end_time": self.stream_read_end_times.get(stream_name),
}

time_to_first_record = None
if (
stream_name in self.stream_first_record_times
and stream_name in self.stream_read_start_times
):
time_to_first_record = (
self.stream_first_record_times[stream_name]
- self.stream_read_start_times[stream_name]
)
stream_metrics[stream_name]["time_to_first_record"] = time_to_first_record

if (
stream_name in self.stream_read_end_times
and stream_name in self.stream_read_start_times
Expand All @@ -526,6 +572,12 @@ def _log_read_metrics(self) -> None:
stream_metrics[stream_name]["mb_read"] = mb_read
stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4)

if time_to_first_record is not None:
adjusted_metrics = self._calculate_adjusted_metrics(
stream_name, count, time_to_first_record, mb_read
)
stream_metrics[stream_name].update(adjusted_metrics)

perf_metrics["stream_metrics"] = stream_metrics
log_dict["performance_metrics"] = perf_metrics

Expand Down
Loading