Skip to content

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Oct 9, 2025

feat: Add time_to_first_record metric and adjusted performance stats

Summary

Adds time_to_first_record tracking per stream to help distinguish between connector initialization latency and actual data processing speed. When the time to first record exceeds 10 seconds, additional adjusted throughput metrics (records_per_second_adjusted, mb_per_second_adjusted) are calculated using the first-record timestamp instead of the stream-start timestamp.

Key changes:

  • Track stream_first_record_times dict alongside existing stream_read_start_times
  • Calculate time_to_first_record = first_record_time - stream_start_time
  • Add conditional adjusted metrics when time_to_first_record > 10 seconds
  • Existing metrics (records_per_second, mb_per_second) remain unchanged

Example output:

"stream_metrics": {
  "users": {
    "records_read": 1000,
    "time_to_first_record": 15.2,
    "records_per_second": 45.2,
    "records_per_second_adjusted": 67.8,
    "mb_per_second": 2.1,
    "mb_per_second_adjusted": 3.1
  }
}

Review & Testing Checklist for Human

  • Verify timing logic correctness: Test with real connectors to ensure time_to_first_record values make sense and adjusted metrics only appear when threshold exceeded
  • Test edge cases: Verify behavior with very fast/slow connectors, missing timestamps, streams that fail before producing records
  • Validate performance impact: Monitor that additional timestamp tracking doesn't measurably slow down record processing
  • Check metric format: Confirm new metrics integrate properly with existing logging infrastructure and downstream consumers

Notes

Requested by AJ Steers (@aaronsteers) in Devin session: https://app.devin.ai/sessions/c77d07731aad483da4af06da3058d26e

The 10-second threshold was chosen as the point where initialization delay becomes significant enough to warrant adjusted throughput calculations. All existing unit tests pass, but this PR would benefit from integration testing with real connectors to validate the timing logic works correctly in practice.

Summary by CodeRabbit

  • New Features

    • Added per-stream “time to first record” tracking.
    • Introduced adjusted throughput metrics that account for delays before the first record.
    • When byte tracking is enabled, logs include adjusted MB/s alongside existing metrics.
  • Improvements

    • More accurate and informative per-stream performance reporting in logs and metrics output.
    • Conditional calculation ensures adjusted metrics appear only when relevant, keeping logs concise.

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

Note

Auto-merge may have been disabled. Please check the PR status to confirm.

- Track time_to_first_record per stream (time from stream start to first record)
- Add records_per_second_adjusted and mb_per_second_adjusted metrics
- Adjusted metrics only calculated when time_to_first_record > 10 seconds
- Adjusted metrics use first-record time instead of stream-start time
- Helps distinguish connector initialization slowness from data processing speed

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - update perf stats tracking in PyAirbyte to record time_to_first_record per stream, which is the difference between the time we get the steam start event and when we see the first record guy that stream emitted.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1760042022256009?thread_ts=1760042022.256009

Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

Copy link

github-actions bot commented Oct 9, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1760042608-time-to-first-record-tracking' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1760042608-time-to-first-record-tracking'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Oct 9, 2025

📝 Walkthrough

Walkthrough

Adds per-stream tracking of time-to-first-record and introduces adjusted throughput metrics when the first-record delay exceeds a new threshold. Extends ProgressTracker with a new constant, state, and method; updates record tallying and logging to compute and include adjusted MB/s and related metrics conditionally.

Changes

Cohort / File(s) Summary
Progress tracking metrics updates
airbyte/progress.py
Added TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS; introduced stream_first_record_times in ProgressTracker; added _calculate_adjusted_metrics; initialized first-record timestamps in tally_records_read; updated _log_read_metrics to compute per-stream time_to_first_record and conditionally include adjusted throughput (including mb_per_second_adjusted when bytes are tracked).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant SRC as Source Connector
    participant PT as ProgressTracker
    participant LOG as Logger

    rect rgb(235, 245, 255)
    note right of SRC: Read loop emits records per stream
    SRC->>PT: tally_records_read(stream, count, bytes)
    alt first record for stream
        PT->>PT: stream_first_record_times[stream] = now
    end
    end

    PT->>PT: _log_read_metrics()
    PT->>PT: Compute time_to_first_record per stream
    alt time_to_first_record > THRESHOLD and timestamps exist
        PT->>PT: _calculate_adjusted_metrics(stream, count, tffr, mb_read)
        PT->>LOG: Emit stream_metrics with adjusted throughput
    else
        PT->>LOG: Emit stream_metrics without adjusted throughput
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

enable-ai-review

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the primary enhancement by stating the addition of a time_to_first_record metric and adjusted performance statistics and aligns directly with the changes in the PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1760042608-time-to-first-record-tracking

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte/progress.py (1)

561-579: Fix UnboundLocalError when bytes tracking is disabled.

At line 577, mb_read is passed to _calculate_adjusted_metrics, but mb_read is only defined within the if self.bytes_tracking_enabled: block at lines 570-573. When bytes tracking is disabled, this will raise an UnboundLocalError.

Apply this diff to fix the issue:

                 stream_metrics[stream_name]["records_per_second"] = round(
                     count
                     / (
                         self.stream_read_end_times[stream_name]
                         - self.stream_read_start_times[stream_name]
                     ),
                     4,
                 )
+                mb_read = 0.0
                 if self.bytes_tracking_enabled:
                     mb_read = self.stream_bytes_read[stream_name] / 1_000_000
                     stream_metrics[stream_name]["mb_read"] = mb_read
                     stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4)

                 if time_to_first_record is not None:
                     adjusted_metrics = self._calculate_adjusted_metrics(
                         stream_name, count, time_to_first_record, mb_read
                     )
                     stream_metrics[stream_name].update(adjusted_metrics)

This ensures mb_read is always defined before being passed to _calculate_adjusted_metrics. The method already checks self.bytes_tracking_enabled internally before using mb_read, so passing 0.0 when bytes tracking is disabled is safe.

🧹 Nitpick comments (1)
airbyte/progress.py (1)

480-506: Consider simplifying the validation logic, wdyt?

The checks at lines 491-492 are redundant since the caller at lines 552 and 542-543 already verifies these conditions before calling this method. If these are intended as defensive guards, that's fine, but documenting them as such would help clarify the intent.

You could simplify to:

 def _calculate_adjusted_metrics(
     self,
     stream_name: str,
     count: int,
     time_to_first_record: float,
     mb_read: float,
 ) -> dict[str, float]:
-    """Calculate adjusted performance metrics when time_to_first_record exceeds threshold."""
+    """Calculate adjusted performance metrics when time_to_first_record exceeds threshold.
+    
+    Preconditions (validated by caller):
+    - stream_name exists in stream_first_record_times
+    - stream_name exists in stream_read_end_times
+    - time_to_first_record > TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS
+    """
     adjusted_metrics = {}
-    if (
-        time_to_first_record > TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS
-        and stream_name in self.stream_first_record_times
-        and stream_name in self.stream_read_end_times
-    ):
-        adjusted_duration = (
-            self.stream_read_end_times[stream_name]
-            - self.stream_first_record_times[stream_name]
-        )
-        if adjusted_duration > 0:
-            adjusted_metrics["records_per_second_adjusted"] = round(
-                count / adjusted_duration, 4
-            )
-            if self.bytes_tracking_enabled:
-                adjusted_metrics["mb_per_second_adjusted"] = round(
-                    mb_read / adjusted_duration, 4
-                )
+    adjusted_duration = (
+        self.stream_read_end_times[stream_name]
+        - self.stream_first_record_times[stream_name]
+    )
+    if adjusted_duration > 0:
+        adjusted_metrics["records_per_second_adjusted"] = round(
+            count / adjusted_duration, 4
+        )
+        if self.bytes_tracking_enabled:
+            adjusted_metrics["mb_per_second_adjusted"] = round(
+                mb_read / adjusted_duration, 4
+            )
     return adjusted_metrics
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 99d2ccf and 26ddc02.

📒 Files selected for processing (1)
  • airbyte/progress.py (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (4)
airbyte/progress.py (4)

108-109: Nice addition!

The constant is well-named and clearly documents its purpose. The 10-second threshold seems reasonable for distinguishing initialization overhead from steady-state throughput.


197-197: LGTM!

The new tracking dictionary follows the established pattern and fits naturally alongside stream_read_start_times and stream_read_end_times.


285-286: Solid implementation!

The logic correctly captures the timestamp of the first record for each stream without overwriting subsequent records. The placement after the stream start check is appropriate.


540-549: Clean implementation!

The computation correctly handles the case where timestamps might be missing by setting time_to_first_record to None, which is then properly checked before calculating adjusted metrics.

@aaronsteers aaronsteers marked this pull request as ready for review October 9, 2025 20:52
@aaronsteers aaronsteers enabled auto-merge (squash) October 9, 2025 20:52
Copy link

github-actions bot commented Oct 9, 2025

PyTest Results (Fast Tests Only, No Creds)

304 tests  ±0   304 ✅ ±0   6m 2s ⏱️ + 1m 47s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 26ddc02. ± Comparison against base commit 99d2ccf.

@aaronsteers aaronsteers disabled auto-merge October 9, 2025 20:52
@aaronsteers aaronsteers changed the title feat: Add time_to_first_record metric and adjusted performance stats (do not merge) feat: Add time_to_first_record metric and adjusted performance stats Oct 9, 2025
@aaronsteers aaronsteers changed the title feat: Add time_to_first_record metric and adjusted performance stats feat: Add time_to_first_record metric and adjusted performance stats Oct 9, 2025
Copy link

github-actions bot commented Oct 9, 2025

PyTest Results (Full)

368 tests  ±0   352 ✅ ±0   19m 48s ⏱️ -50s
  1 suites ±0    16 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 26ddc02. ± Comparison against base commit 99d2ccf.

@aaronsteers aaronsteers merged commit 7313e98 into main Oct 9, 2025
27 of 28 checks passed
@aaronsteers aaronsteers deleted the devin/1760042608-time-to-first-record-tracking branch October 9, 2025 22:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant