-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Add stream previews for sources #725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds stream preview functionality to PyAirbyte sources, enabling developers to quickly inspect sample records from data streams. The feature includes both programmatic access via get_samples()
and formatted console output via print_samples()
methods.
Key changes:
- New methods
get_samples()
andprint_samples()
added to the Source class for retrieving and displaying sample records - Enhanced dataset classes with support for limiting records and early termination via threading events
- Added example scripts demonstrating the new functionality with Faker and GitHub sources
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
airbyte/sources/base.py | Core implementation of sample preview functionality with Rich table formatting |
airbyte/datasets/_lazy.py | Enhanced LazyDataset with stop events and fetch_all() method |
airbyte/datasets/_inmemory.py | New InMemoryDataset class for holding records in memory |
airbyte/datasets/_base.py | Added column_names property to base dataset class |
airbyte/datasets/_sql.py | Enhanced SQLDataset with column_names property including internal columns |
airbyte/progress.py | Added sync cancellation tracking and improved completion messages |
airbyte/_util/telemetry.py | Added CANCELED state for telemetry events |
examples/run_faker_samples.py | Example script demonstrating sample preview with Faker source |
examples/run_github_samples.py | Example script demonstrating sample preview with GitHub source |
📝 WalkthroughWalkthroughThis update introduces new sampling and record retrieval features to the Airbyte source interface, including sample printing utilities and enhanced interruption controls. It adds an in-memory dataset class, extends dataset base and SQL dataset classes with column name properties, and improves telemetry and progress tracking. Two example scripts demonstrate the new sampling capabilities. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Source
participant LazyDataset
participant InMemoryDataset
User->>Source: get_samples(streams, limit, on_error)
Source->>Source: resolve streams
loop For each stream
Source->>Source: get_records(stream, limit, stop_event)
Source->>LazyDataset: construct with iterator, stop_event, progress_tracker
LazyDataset->>InMemoryDataset: fetch_all()
Source->>User: return InMemoryDataset or None
end
User->>Source: print_samples(streams, limit, on_error)
Source->>User: prints formatted tables
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Does this look good to you? Would you like me to help draft some usage notes or suggest tests for the new sampling and interruption features? Wdyt? ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
airbyte/sources/base.py (1)
657-700
: Potential issue with cancellation timingI notice the stop_event check happens after
yield from progress_tracker.tally_records_read(message_generator)
completes (lines 697-699). This means cancellation only takes effect after ALL messages have been processed, not during the iteration.For true mid-stream cancellation, wouldn't you need to check the stop_event inside the iteration loop? Maybe something like:
for message in progress_tracker.tally_records_read(message_generator): if stop_event and stop_event.is_set(): progress_tracker._log_sync_cancel() return yield messagewdyt? Or is the current behavior intentional?
🧹 Nitpick comments (1)
airbyte/datasets/_lazy.py (1)
71-73
: Consider adding context manager support?While
__del__
helps with cleanup, it's not always reliable (circular refs, interpreter shutdown, etc.). Have you considered also implementing the context manager protocol (__enter__
/__exit__
)? This would allow users to ensure cleanup withwith
statements for more predictable resource management, wdyt?+ def __enter__(self) -> LazyDataset: + """Enter the context manager.""" + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Exit the context manager and close the dataset.""" + self.close() + def __del__(self) -> None: """Close the dataset when the object is deleted.""" self.close()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
airbyte/_util/telemetry.py
(1 hunks)airbyte/datasets/_base.py
(2 hunks)airbyte/datasets/_inmemory.py
(1 hunks)airbyte/datasets/_lazy.py
(2 hunks)airbyte/datasets/_sql.py
(2 hunks)airbyte/progress.py
(2 hunks)airbyte/sources/base.py
(8 hunks)examples/run_faker_samples.py
(1 hunks)examples/run_github_samples.py
(1 hunks)
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_faker_samples.py
examples/run_github_samples.py
🧠 Learnings (6)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
examples/run_faker_samples.py (4)
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-08-31T01:20:08.405Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
airbyte/datasets/_base.py (3)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: aaronsteers
PR: #281
File: airbyte/caches/bigquery.py:40-43
Timestamp: 2024-10-08T15:34:31.026Z
Learning: The BigQueryCache.get_arrow_dataset
method should have a docstring that correctly states the reason for the NotImplementedError
as BigQuery not supporting to_arrow
, instead of incorrectly mentioning pd.read_sql_table
.
examples/run_github_samples.py (4)
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-08-31T01:20:08.405Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
airbyte/datasets/_sql.py (2)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
airbyte/sources/base.py (4)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
🧬 Code Graph Analysis (8)
examples/run_faker_samples.py (1)
airbyte/sources/base.py (1)
print_samples
(567-638)
airbyte/datasets/_base.py (1)
airbyte/datasets/_sql.py (1)
column_names
(145-147)
examples/run_github_samples.py (1)
airbyte/sources/base.py (2)
get_available_streams
(231-233)print_samples
(567-638)
airbyte/progress.py (1)
airbyte/_util/telemetry.py (3)
send_telemetry
(275-325)EventState
(174-178)EventType
(181-185)
airbyte/datasets/_inmemory.py (1)
airbyte/datasets/_base.py (1)
DatasetBase
(25-84)
airbyte/datasets/_sql.py (1)
airbyte/datasets/_base.py (1)
column_names
(82-84)
airbyte/sources/base.py (3)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/datasets/_lazy.py (2)
LazyDataset
(21-73)fetch_all
(54-59)airbyte/progress.py (1)
_log_sync_cancel
(335-343)
airbyte/datasets/_lazy.py (2)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/progress.py (1)
log_success
(355-385)
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit Configuration File
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_faker_samples.py
examples/run_github_samples.py
🧠 Learnings (6)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
examples/run_faker_samples.py (4)
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-08-31T01:20:08.405Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
airbyte/datasets/_base.py (3)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: aaronsteers
PR: #281
File: airbyte/caches/bigquery.py:40-43
Timestamp: 2024-10-08T15:34:31.026Z
Learning: The BigQueryCache.get_arrow_dataset
method should have a docstring that correctly states the reason for the NotImplementedError
as BigQuery not supporting to_arrow
, instead of incorrectly mentioning pd.read_sql_table
.
examples/run_github_samples.py (4)
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #347
File: tests/integration_tests/fixtures/registry.json:48-48
Timestamp: 2024-08-31T01:20:08.405Z
Learning: Test fixtures in the PyAirbyte project do not need to align with real Docker repositories.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
airbyte/datasets/_sql.py (2)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
airbyte/sources/base.py (4)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
🧬 Code Graph Analysis (8)
examples/run_faker_samples.py (1)
airbyte/sources/base.py (1)
print_samples
(567-638)
airbyte/datasets/_base.py (1)
airbyte/datasets/_sql.py (1)
column_names
(145-147)
examples/run_github_samples.py (1)
airbyte/sources/base.py (2)
get_available_streams
(231-233)print_samples
(567-638)
airbyte/progress.py (1)
airbyte/_util/telemetry.py (3)
send_telemetry
(275-325)EventState
(174-178)EventType
(181-185)
airbyte/datasets/_inmemory.py (1)
airbyte/datasets/_base.py (1)
DatasetBase
(25-84)
airbyte/datasets/_sql.py (1)
airbyte/datasets/_base.py (1)
column_names
(82-84)
airbyte/sources/base.py (3)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/datasets/_lazy.py (2)
LazyDataset
(21-73)fetch_all
(54-59)airbyte/progress.py (1)
_log_sync_cancel
(335-343)
airbyte/datasets/_lazy.py (2)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/progress.py (1)
log_success
(355-385)
🔇 Additional comments (18)
airbyte/_util/telemetry.py (1)
178-178
: LGTM! Clean enum extension for cancellation tracking.The addition of the
CANCELED
state follows the existing pattern and naming convention perfectly. This will enable proper telemetry tracking for canceled sync operations.examples/run_faker_samples.py (1)
1-18
: Perfect example script! Clean and focused demonstration.This script excellently demonstrates the new sampling functionality with just the right amount of brevity for an example. The 200k record count provides a good test case while the simple structure makes it easy to understand. The usage instructions are clear and helpful.
airbyte/datasets/_sql.py (2)
15-20
: Clean import expansion for the new constants.The addition of the Airbyte-specific column constants is well-organized and focused on exactly what's needed for the new functionality.
144-147
: Excellent property implementation! Nice use of super().The
column_names
property elegantly extends the base class functionality by adding the internal Airbyte columns. The spread operator usage is clean and the docstring clearly explains the purpose. This will work great with the new sampling features, wdyt?airbyte/datasets/_base.py (2)
11-11
: Good import addition for type consistency.Adding the
ConfiguredAirbyteStream
import maintains consistency with other dataset classes and supports proper type annotations.
81-84
: Solid foundation for column name access across datasets.The
column_names
property provides a clean interface for extracting column names from the JSON schema. This base implementation will work perfectly with the extensions inSQLDataset
and support the new sampling functionality nicely, wdyt?examples/run_github_samples.py (3)
1-16
: Great example setup with proper authentication handling.The script structure is clean and the use of
ab.get_secret()
for the GitHub token is a good practice for examples. The usage instructions are helpful and the setup notes are practical.
18-25
: Smart approach to handle known failing streams.Having a predefined list of failing streams is a practical solution for a demo script. The inline comment explaining the specific error for
issue_timeline_events
is helpful for understanding, wdyt?
27-39
: Excellent demonstration of the sampling feature.The source configuration is clean and the stream filtering logic using set operations is elegant. The final
print_samples()
call nicely demonstrates the new functionality with the filtered streams. This example perfectly balances simplicity with real-world complexity, wdyt?airbyte/datasets/_inmemory.py (1)
19-45
: Clean implementation of InMemoryDataset! 👍The class is well-designed for its intended testing/debugging purpose. I particularly like the defensive conversion to list in the constructor - smart move to handle iterator inputs gracefully.
airbyte/progress.py (2)
335-343
: Good addition of cancellation logging! ✨The
_log_sync_cancel
method follows the established pattern of other logging methods perfectly. Nice consistency with the telemetry integration.
366-377
: Nice enhancement to show stream information in completion message! 🎯The dynamic stream info makes the output much more informative. One quick question though - by using
stream_read_start_times.keys()
, are we assuming all started streams completed successfully? What happens if a stream starts but errors out midway - would it still appear in this list? Just wondering if this could be misleading in error scenarios, wdyt?airbyte/datasets/_lazy.py (3)
24-37
: Constructor enhancement looks good! 👌Nice addition of the stop_event and progress_tracker parameters. The
or None
normalization for stop_event is a good defensive practice.
54-59
: Nice addition of fetch_all method! 🚀Clean way to materialize the remaining records into memory. Users should just be aware that this exhausts the underlying iterator, but that's expected behavior for this pattern.
61-69
: Good addition of explicit close method! ✅Clean implementation with proper null checking. The docstring clearly explains the use case.
airbyte/sources/base.py (3)
422-505
: Excellent enhancement to get_records with limit and cancellation support! 🎉The implementation is clean and well-integrated. Good thinking on:
- Creating a default Event if none provided
- Using
islice
for efficient limiting- Passing the stop_event through the entire chain
531-565
: Great addition of get_samples method! 🔍I like the flexible error handling modes and the clean implementation. One question though - I notice you're setting the stop_event after
fetch_all()
completes (line 553). Sincefetch_all()
consumes all records, wouldn't the stop_event be too late to actually stop anything? Or is this just for cleanup purposes? wdyt?
567-639
: Fantastic implementation of print_samples! 🎨This is really well thought out:
- Smart filtering of internal Airbyte columns
- Clever pivoting for wide tables (great UX consideration!)
- Beautiful formatting with rich
- Proper error handling
The attention to user experience here is excellent!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/sources/base.py (1)
770-775
: Address the private method access issue.The stop_event logic is sound, but accessing
progress_tracker._log_sync_cancel()
violates encapsulation and triggers a linting error. This matches the concern raised in previous reviews.Wdyt about either:
- Making
_log_sync_cancel()
a public method in ProgressTracker- Adding a public
cancel_sync()
method that wraps the private implementation- Using a different approach that doesn't require direct access to private methods
for message in progress_tracker.tally_records_read(message_generator): if stop_event and stop_event.is_set(): - progress_tracker._log_sync_cancel() + progress_tracker.cancel_sync() # Use public method instead return
🧹 Nitpick comments (1)
airbyte/sources/base.py (1)
639-714
: Great sample display functionality with rich formatting!The
print_samples
method provides excellent user experience with markdown formatting and intelligent column pivoting for wide tables. The error handling and visual presentation are well thought out.One small inconsistency I noticed - internal columns are filtered out in the regular table display (lines 707-709) but not in the pivoted view. Wdyt about applying the same internal column filtering in the pivot case for consistency?
if len(dataset.column_names) > col_limit: # We'll pivot the columns so each column is its own row table.add_column("Column Name") for _ in range(len(dataset)): table.add_column(overflow="fold") - for col in dataset.column_names: + for col in dataset.column_names: + if col in internal_cols: + continue table.add_row( Markdown(f"**`{col}`**"), *[escape(str(record[col])) for record in dataset], )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/sources/base.py
(11 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
airbyte/sources/base.py (4)
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: Suraj-Vishwakarma70
PR: #285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the Source
class in airbyte/sources/base.py
, such as renaming _to_be_selected_stream
to _to_be_selected_streams
.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
Learnt from: aaronsteers
PR: #411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In airbyte/sources/util.py
, the function get_benchmark_source
is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like get_dummy_source
or get_mock_source
with broader functionality.
🧬 Code Graph Analysis (1)
airbyte/sources/base.py (5)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/_connector_base.py (3)
ConnectorBase
(54-499)name
(84-86)_hydrated_config
(147-152)airbyte/datasets/_lazy.py (2)
LazyDataset
(21-73)fetch_all
(54-59)airbyte/_util/temp_files.py (1)
as_temp_files
(23-66)airbyte/progress.py (2)
tally_records_read
(249-304)_log_sync_cancel
(424-429)
🪛 GitHub Actions: Run Linters
airbyte/sources/base.py
[error] 772-772: Ruff SLF001: Private member accessed: _log_sync_cancel
.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
🔇 Additional comments (6)
airbyte/sources/base.py (6)
7-18
: LGTM on the new imports and constants!The added imports support the new sampling and display functionality well. The threading support for stop events, itertools for record limiting, and rich library for formatted output are all appropriate choices for the new features.
Also applies to: 46-63
211-213
: Good addition of @OverRide decorator for clarity!Adding the
@override
decorator to_get_spec
improves code clarity and helps with IDE support. The formatting cleanup in the primary key overrides method is also nice.Also applies to: 332-333
496-577
: Nice implementation of limit and stop_event parameters!The addition of
limit
andstop_event
parameters toget_records
provides good control over record fetching. The use ofislice
for limiting and passing the stop_event toLazyDataset
is well thought out.One small consideration - wdyt about applying the limit earlier in the pipeline? Currently it's applied after record processing, but applying it at the message level could potentially save some processing overhead for large limits.
603-637
: Excellent design for the sampling functionality!The
get_samples
method provides great flexibility with stream selection, error handling modes, and proper isolation per stream. The error handling logic is well-structured.Quick question - is the
stop_event.set()
call on line 625 necessary afterfetch_all()
? Sincefetch_all()
should exhaust the iterator naturally, wdyt about whether this explicit set is needed?
749-757
: Nice formatting improvement for readability!The multiline formatting of the
as_temp_files
call makes the arguments much clearer and easier to read.
736-738
: Good API design with keyword-only parameter!Making
stop_event
a keyword-only parameter is excellent design - it maintains backward compatibility while clearly indicating this is an optional control parameter.
…733) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/sources/base.py (1)
738-780
: Stop event handling needs refinement.This implementation has the same issues identified in previous reviews:
The stop_event check occurs after yielding from the message generator, which could process many records before checking. Would moving the check into the progress tracker's
tally_records_read
method provide more responsive cancellation, wdyt?Accessing
progress_tracker._log_sync_cancel()
violates encapsulation. Could we add a public method to handle sync cancellation instead?The current placement means if
tally_records_read
yields 1000 records in a batch, all 1000 will be processed before the stop check occurs.
🧹 Nitpick comments (2)
airbyte/sources/base.py (2)
496-498
: Nice implementation of controlled record fetching!The
limit
andstop_event
parameters add valuable control over record reading. The use ofislice
for limiting is efficient. One small suggestion - wdyt about only creating the defaultstop_event
when it's actually needed for the LazyDataset constructor?- stop_event = stop_event or threading.Event() + if stop_event is None: + stop_event = threading.Event()This avoids creating an Event object when the caller provided one.
Also applies to: 505-507, 524-524, 564-571, 575-576
603-637
: Well-structured sampling method with good error handling!The flexible stream selection and error handling modes are thoughtful. A couple of questions:
Is the
stop_event.set()
call on Line 625 necessary afterfetch_all()
? Thefetch_all()
method should exhaust the iterator, making the stop event redundant, wdyt?Would it be helpful to be more specific about expected exceptions in the error handling? Currently catching all exceptions might mask unexpected issues.
- except Exception as ex: + except (exc.AirbyteConnectorFailedError, exc.AirbyteStreamNotFoundError) as ex:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte/progress.py
(2 hunks)airbyte/sources/base.py
(9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/progress.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
📚 Learning: ensure consistent naming for attributes in the `source` class in `airbyte/sources/base.py`, such as ...
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
📚 Learning: in `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purpo...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Applied to files:
airbyte/sources/base.py
🧬 Code Graph Analysis (1)
airbyte/sources/base.py (4)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/_connector_base.py (2)
ConnectorBase
(54-499)name
(84-86)airbyte/datasets/_lazy.py (2)
LazyDataset
(21-73)fetch_all
(54-59)airbyte/progress.py (2)
tally_records_read
(249-304)_log_sync_cancel
(424-431)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (5)
airbyte/sources/base.py (5)
7-18
: LGTM on the new imports!The imports are well-organized and directly support the new sampling functionality. Good use of the
@override
decorator and importing constants for internal column filtering.Also applies to: 46-49, 54-63
639-716
: Excellent user experience with the formatted sample display!The rich formatting, adaptive table layout, and internal column filtering create a polished output. The pivot approach for wide tables (>10 columns) is clever. One small question - is 10 columns the right threshold for pivoting? Might depend on terminal width, but this seems like a reasonable default, wdyt?
The markdown formatting and error handling integration look great!
332-332
: Great addition of the @OverRide decorator!This explicitly marks the method override from
ConnectorBase
, improving code clarity and helping catch signature mismatches. Nice adherence to best practices!
603-716
: Excellent integration with existing codebase patterns!The new sampling methods follow established conventions for stream selection, error handling, and progress tracking. The integration between
LazyDataset.fetch_all()
andInMemoryDataset
creates a clean abstraction for in-memory sampling. Well done on maintaining consistency with the existing API design!
603-637
: Good performance characteristics for sampling use case.The memory usage is properly bounded by the
limit
parameter, makingInMemoryDataset
appropriate for sample storage. The rich formatting overhead is justified for the improved user experience. The approach scales well for typical sampling scenarios.Also applies to: 639-716
- Add 0.1 second delay after stop event detection to allow multiprocessing workers to finish - Suppress stderr output for stream preview operations to eliminate BrokenPipeError noise - Enhance pipe cleanup and subprocess termination handling - Fix method signature compatibility in DeclarativeExecutor Resolves loud BrokenPipeError warnings during stream preview functionality while maintaining successful process termination behavior. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/sources/base.py (1)
775-781
: Good responsive cancellation logic, but a couple of concerns!The stop event checking after each message yield provides responsive cancellation, which is great! However, I notice two things:
- There's a past review comment about accessing the private
_log_sync_cancel()
method - should this be made public or have a public wrapper?- The
time.sleep(0.1)
seems a bit arbitrary - what's the reasoning for this specific duration? Is it to allow other threads to process the cancellation?The overall cancellation approach looks solid though!
🧹 Nitpick comments (3)
airbyte/_executors/base.py (1)
46-50
: Smart refinement of error handling! One small suggestion though.The distinction between
BrokenPipeError
(expected during shutdown) and otherOSError
types is excellent. However, I'm wondering if we should be more explicit about the error handling logic. What do you think about adding a brief comment explaining whyBrokenPipeError
is silently ignored? Something like:except (BrokenPipeError, OSError) as ex: if isinstance(ex, BrokenPipeError): + # BrokenPipeError is expected when subprocess terminates early pass # Expected during graceful shutdown
This would make the intent clearer for future maintainers. wdyt?
airbyte/sources/base.py (2)
497-498
: Nice addition of limit and stop_event parameters!These parameters provide great control over record reading. The documentation clearly explains their usage. One small question - would it be more efficient to only create the threading.Event() when stop_event is actually None, rather than always creating one? wdyt?
- stop_event = stop_event or threading.Event() + if stop_event is None: + stop_event = threading.Event()Also applies to: 506-508
604-638
: Well-structured get_samples method with good error handling!The method provides flexible stream selection and error handling modes. One small question - is the
stop_event.set()
on line 626 necessary afterfetch_all()
succeeds? Sincefetch_all()
should naturally exhaust the iterator, the stop event might be redundant there. wdyt?The error handling with the three modes (raise/ignore/log) gives users nice control over how failures are handled.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/_connector_base.py
(1 hunks)airbyte/_executors/base.py
(4 hunks)airbyte/_executors/declarative.py
(1 hunks)airbyte/sources/base.py
(9 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte/_executors/declarative.py
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
📚 Learning: in pyairbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designe...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:111-160
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In PyAirbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designed to decouple the message text from dynamic values, following a structlog-inspired design. Dynamic values are provided via parameters like `input_value`. This approach helps avoid including PII in the message strings, which may be used in telemetry.
Applied to files:
airbyte/_executors/base.py
📚 Learning: ensure consistent naming for attributes in the `source` class in `airbyte/sources/base.py`, such as ...
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
📚 Learning: in `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purpo...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/sources/util.py:121-123
Timestamp: 2024-10-06T17:35:27.056Z
Learning: In `airbyte/sources/util.py`, the function `get_benchmark_source` is intended for benchmarking purposes in the current iteration. Future plans may include adding more generic functions like `get_dummy_source` or `get_mock_source` with broader functionality.
Applied to files:
airbyte/sources/base.py
🧬 Code Graph Analysis (3)
airbyte/_connector_base.py (2)
airbyte/_executors/declarative.py (1)
execute
(102-115)airbyte/_executors/base.py (1)
execute
(205-223)
airbyte/_executors/base.py (1)
airbyte/_message_iterators.py (1)
AirbyteMessageIterator
(61-205)
airbyte/sources/base.py (4)
airbyte/datasets/_inmemory.py (1)
InMemoryDataset
(19-45)airbyte/_connector_base.py (2)
ConnectorBase
(54-500)name
(84-86)airbyte/datasets/_lazy.py (2)
LazyDataset
(21-73)fetch_all
(54-59)airbyte/progress.py (2)
tally_records_read
(249-304)_log_sync_cancel
(424-431)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.11, Windows)
🔇 Additional comments (11)
airbyte/_connector_base.py (1)
468-469
: LGTM! Clean integration of stderr suppression with progress tracking.The logic to suppress stderr when progress tracking is active makes perfect sense - it helps keep the progress output clean and focused. The implementation correctly propagates the parameter down to the executor level.
airbyte/_executors/base.py (3)
70-82
: Excellent implementation of conditional stderr handling!The logic correctly uses
subprocess.DEVNULL
whensuppress_stderr=True
and falls back tolog_file
otherwise. The implementation handles both code paths (with and withoutAirbyteMessageIterator
) consistently.
105-113
: Consistent stderr handling in the alternative code path.Good to see the same conditional logic applied here for the case when stdin is not an
AirbyteMessageIterator
. The implementation maintains consistency across both subprocess creation scenarios.
210-222
: Well-documented parameter addition!The
suppress_stderr
parameter is properly added to the method signature with appropriate typing and clear documentation. The parameter propagation to_stream_from_subprocess
is correct.airbyte/sources/base.py (7)
7-19
: New imports look well-structured for the sampling functionality!The threading, time, islice, and rich imports are all appropriately used in the new sampling methods. The rich components will provide nice formatted output for the print_samples method.
47-55
: Good practice moving imports to TYPE_CHECKING block!Moving AirbyteStream and ConnectorSpecification to the TYPE_CHECKING block helps avoid potential circular imports while keeping type annotations clean.
60-64
: Appropriate constants for filtering internal columns!These Airbyte column constants are properly used in the print_samples method to hide internal metadata from the user-facing display.
333-333
: Excellent addition of the @OverRide decorator!This makes it explicit that _get_spec overrides the parent method from ConnectorBase, which helps with maintainability and catching signature mismatches.
569-571
: Perfect placement of the limit using islice!The limit is correctly applied to the final processed records rather than raw messages, which gives users the expected behavior.
640-716
: Excellent print_samples method with great UX!Love the thoughtful features here:
- Rich markdown formatting for clear section headers
- Smart table pivoting when there are too many columns (>10 limit seems perfect)
- Filtering out internal Airbyte columns so users see only their data
- Graceful error handling with clear warning messages
The use of
escape()
for user data is also a nice security touch to prevent rich markup injection.
739-741
: Great addition of keyword-only arguments!Making
stop_event
keyword-only prevents positional argument confusion and makes the API more explicit. This is especially valuable for optional parameters that could easily be misplaced.
Summary by CodeRabbit
New Features
Enhancements
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.