Skip to content

Conversation

aldogonzalez8
Copy link
Contributor

@aldogonzalez8 aldogonzalez8 commented Jul 5, 2025

What

Logging like this one is very difficult to interpret or understand what is going on?

image

  1. Many of the logs appear to have occurred at the same time, which is highly unlikely, as there is only one worker.
  2. What exactly is being rate-limited? Is it always the same slice, or are we progressing?

Update: Actually, the minimum number of workers should be 2, so this could be more complicated to examine, then extra logging is more appreciated.

How

Add more logging, I don't believe there is a problem adding the slice information and UTC time when the log occurred.

Summary by CodeRabbit

Summary by CodeRabbit

  • Enhancements
    • Improved log messages for rate limit retries by including the current timestamp and contextual stream information, providing clearer context for retry attempts.
    • Added support for passing stream slice context during HTTP requests and retries to enhance rate limit handling and backoff behavior.
  • New Features
    • Introduced a thread-safe singleton to manage request-specific context across threads.
  • Bug Fixes
    • Fixed rate limit retry logging to include relevant stream slice details.
  • Tests
    • Added tests verifying singleton behavior and thread safety of the new request context manager.
    • Added tests ensuring proper handling and logging of rate-limited HTTP requests.

@aldogonzalez8 aldogonzalez8 self-assigned this Jul 5, 2025
@github-actions github-actions bot added the enhancement New feature or request label Jul 5, 2025
Copy link

github-actions bot commented Jul 5, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@aldogonzalez8/rate-limitmg-logging#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch aldogonzalez8/rate-limitmg-logging

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Jul 5, 2025

📝 Walkthrough

Walkthrough

The update enhances the backoff handler to accept an optional stream_slice parameter for contextual information and includes this in the rate limit logging with a timestamp. Corresponding methods in HttpClient, HttpRequester, and HttpStream were updated to pass stream_slice through the call chain, enabling richer context during request retries without altering existing logic. Additionally, a thread-safe singleton RequestLocal class was introduced to store and access the current stream_slice context. Tests were added to verify the singleton behavior and thread-local data isolation. A new test for rate limit handling in HttpRequester was also added.

Changes

File(s) Change Summary
airbyte_cdk/sources/streams/http/rate_limiting.py Added optional stream_slice parameter to user_defined_backoff_handler; enhanced log messages with timestamp and stream slice info.
airbyte_cdk/sources/streams/http/http_client.py Added optional stream_slice parameter to _send_with_retry and send_request methods; passed stream_slice to backoff handler.
airbyte_cdk/sources/declarative/requesters/http_requester.py Passed stream_slice argument to _http_client.send_request call; set stream_slice in RequestLocal before sending request.
airbyte_cdk/sources/streams/http/http.py Passed stream_slice argument to _http_client.send_request call inside _fetch_next_page method.
airbyte_cdk/sources/declarative/request_local/init.py Added module exporting RequestLocal class.
airbyte_cdk/sources/declarative/request_local/request_local.py Added RequestLocal singleton class with thread-safe initialization and thread-local stream_slice property.
unit_tests/sources/declarative/request_local/test_request_local.py Added tests for RequestLocal singleton behavior, thread safety, and thread-local data isolation.
unit_tests/sources/declarative/requesters/test_http_requester.py Added helper to create rate-limited HttpRequester and test verifying rate limit backoff exception and log output.

Sequence Diagram(s)

sequenceDiagram
    participant HttpStream
    participant HttpRequester
    participant HttpClient
    participant RequestLocal
    participant BackoffHandler

    HttpStream->>HttpClient: send_request(stream_slice)
    HttpRequester->>RequestLocal: set stream_slice
    HttpRequester->>HttpClient: send_request(stream_slice)
    HttpClient->>BackoffHandler: user_defined_backoff_handler(stream_slice)
    BackoffHandler->>RequestLocal: get stream_slice
    BackoffHandler-->>HttpClient: retry with backoff and logging including stream_slice and timestamp
Loading

Suggested labels

chore

Suggested reviewers

  • lazebnyi
  • maxi297
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
airbyte_cdk/sources/streams/http/rate_limiting.py (1)

114-114: Fix the typo in the comment.

There's a small typo in the comment: "og" should be "of". Could you fix this? wdyt?

-            # include logging og the current time to help with debugging
+            # include logging of the current time to help with debugging
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9ec30dc and a3df9c5.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/streams/http/rate_limiting.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/http/rate_limiting.py

[error] 112-114: ruff formatting check failed. File requires reformatting to comply with style rules.

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)

Copy link

github-actions bot commented Jul 5, 2025

PyTest Results (Fast)

3 697 tests  +4   3 686 ✅ +4   6m 21s ⏱️ +2s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 12f8675. ± Comparison against base commit 9ec30dc.

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Jul 5, 2025

PyTest Results (Full)

3 700 tests  +4   3 689 ✅ +4   18m 5s ⏱️ +12s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 12f8675. ± Comparison against base commit 9ec30dc.

♻️ This comment has been updated with latest results.

@aldogonzalez8
Copy link
Contributor Author

aldogonzalez8 commented Jul 5, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a3df9c5 and 8ff11cf.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/streams/http/http_client.py (1 hunks)
  • airbyte_cdk/sources/streams/http/rate_limiting.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/streams/http/rate_limiting.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/http/http_client.py

[error] 259-261: Ruff formatting check failed. Code style issues detected; run 'ruff format --fix' to reformat this file.

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

262-264: Great enhancement for debugging context!

This change adds valuable debugging information by passing the request URL to the backoff handler. The implementation looks correct and aligns well with the PR objective of improving debugging capabilities, wdyt?

@aldogonzalez8
Copy link
Contributor Author

aldogonzalez8 commented Jul 5, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 2 commits July 5, 2025 14:07
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a71c505 and ad606dc.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/requesters/http_requester.py (1 hunks)
  • airbyte_cdk/sources/streams/http/http.py (1 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (4 hunks)
  • airbyte_cdk/sources/streams/http/rate_limiting.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/streams/http/rate_limiting.py
  • airbyte_cdk/sources/streams/http/http_client.py
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#13
File: airbyte_cdk/connector.py:99-99
Timestamp: 2024-11-10T04:50:11.914Z
Learning: When a PR's goal is to run the autoformat task from `ruff`, avoid suggesting code changes beyond formatting to prevent potential negative side effects.
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/http/http.py

[error] 546-546: Ruff formatting check failed. Code style issues detected; run 'ruff format --fix' to fix.

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/requesters/http_requester.py (1)

476-476: Perfect consistency with the HttpStream changes, wdyt?

The addition of stream_slice=stream_slice here maintains consistency with the changes in HttpStream._fetch_next_page, ensuring that stream slice context flows through both the declarative and non-declarative HTTP request paths.

@aldogonzalez8
Copy link
Contributor Author

aldogonzalez8 commented Jul 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@aldogonzalez8 aldogonzalez8 changed the title feat(rate-lmimit): update debugging feat(rate-limit): update debugging Jul 6, 2025
@aldogonzalez8 aldogonzalez8 changed the title feat(rate-limit): update debugging feat(rate-limit): add logging with more contect Jul 6, 2025
@aldogonzalez8 aldogonzalez8 changed the title feat(rate-limit): add logging with more contect feat(rate-limit): add logging with more contetx Jul 6, 2025
@aldogonzalez8 aldogonzalez8 changed the title feat(rate-limit): add logging with more contetx feat(rate-limit): add logging with more context Jul 6, 2025
@aldogonzalez8
Copy link
Contributor Author

aldogonzalez8 commented Jul 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

dedupe_query_params: bool = False,
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
exit_on_rate_limit: Optional[bool] = False,
stream_slice: Optional[Mapping[str, Any]] = None,
Copy link
Contributor

@maxi297 maxi297 Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the connector you are trying to update low-code?

Wild idea: instead of having to modify all the interfaces that would require us to low more information, could we create a thread-local in a threading.py file that users that want to log more information like the slice could have access to this? PartitionReader.process_partition would basically do thread_local.stream_slice = partition.to_slice and at that point, http_client can just get the information from thread_local.stream_slice. If anyone else than the HttpClient need access to this information, they just need to do the same.

We can probably assume it would not only work for the concurrent CDK as we could register the information in the thread local even in a non-concurrent world. I just haven't checked where this would happen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the connector you are trying to update low-code?

@maxi297 Yes, it's Slack, I received the feedback from a user who was unable to say after a few hours if the attempt was "stuck". Honestly, I couldn't say as the rate limit logs won't tell me too much.

I was recently working on bumping Cdk 6 for this connector and was trying to get better logs on the fly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 I need to do some cleanup, but I added the singleton class that inherits from local and added some tests around.

Comment on lines +32 to +39
def get_instance(cls):
"""
Get the singleton instance of RequestLocal.
This is the recommended way to get the instance.
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance No newline at end of file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this

dedupe_query_params=True,
log_formatter=log_formatter,
exit_on_rate_limit=self._exit_on_rate_limit,
stream_slice=stream_slice,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this.

dedupe_query_params=True,
log_formatter=self.get_log_formatter(),
exit_on_rate_limit=self.exit_on_rate_limit,
stream_slice=stream_slice,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

request_kwargs: Mapping[str, Any],
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
exit_on_rate_limit: Optional[bool] = False,
stream_slice: Optional[Mapping[str, Any]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

request_kwargs=request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
stream_slice=stream_slice,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this



logged_messages = [record.message for record in caplog.records]
assert "Caught retryable error 'Too many requests.' after 1 tries. Waiting 1 seconds then retrying for slice: {'start': '2012'}..." in logged_messages
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can see the stream slice read from request_local

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (4)
unit_tests/sources/declarative/request_local/test_request_local.py (4)

1-6: Remove unused imports

The sys, os, and ThreadPoolExecutor imports are not used in the active code. Would you consider removing them? wdyt?

-import sys
-import os
 import threading
 import time
-from concurrent.futures import ThreadPoolExecutor

25-33: Remove duplicate assignment

There's a duplicate assignment of stream_slice that can be removed.

     # get_instance class method
     instance4 = RequestLocal.get_instance()
     instance1.stream_slice = {"test": "data"}

-    # stream_slice property
-    instance1.stream_slice = {"test": "data"}
     assert instance1.stream_slice is instance4.stream_slice
     assert instance2.stream_slice is instance4.stream_slice

48-71: Fix typo in variable name

There's a typo in the variable name.

     results = {}
     threads = []
-    total_treads = 5
+    total_threads = 5
     # Create multiple threads that instantiate RequestLocal
-    for i in range(total_treads):
+    for i in range(total_threads):
         thread = threading.Thread(target=create_instance_in_thread, args=(i, results))
         threads.append(thread)
         thread.start()

     # Wait for all threads to complete
     for thread in threads:
         thread.join()

     # Analyze results
     instance_ids = [result[INSTANCE_ID_KEY] for result in results.values()]
     unique_ids = set(instance_ids)

-    assert len(results) == total_treads, "All threads should have created an instance"
+    assert len(results) == total_threads, "All threads should have created an instance"
     assert len(unique_ids) == 1, "All threads should see the same singleton instance"

138-139: Use standard comment format

Would you consider using the standard FIXME: or TODO: format for the comment? wdyt?

-# Fixme: Uncomment this test put asserts and remove prints to test concurrent access
+# TODO: Uncomment this test, add asserts and remove prints to test concurrent access
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3f42896 and 12f8675.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/request_local/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/request_local/request_local.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/http_requester.py (3 hunks)
  • airbyte_cdk/sources/streams/http/rate_limiting.py (4 hunks)
  • unit_tests/sources/declarative/request_local/test_request_local.py (1 hunks)
  • unit_tests/sources/declarative/requesters/test_http_requester.py (4 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte_cdk/sources/declarative/request_local/init.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/requesters/http_requester.py
  • airbyte_cdk/sources/streams/http/rate_limiting.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/request_local/request_local.py

[error] 7-7: mypy: Function is missing a type annotation. (no-untyped-def)


[error] 15-15: mypy: Function is missing a return type annotation. Use '-> None' if function does not return a value. (no-untyped-def)


[error] 24-24: mypy: Function is missing a return type annotation. (no-untyped-def)


[error] 28-28: mypy: Function is missing a type annotation. (no-untyped-def)


[error] 32-32: mypy: Function is missing a return type annotation. (no-untyped-def)

unit_tests/sources/declarative/request_local/test_request_local.py

[error] 10-180: Ruff formatting check failed. The test file contains inconsistent spacing and formatting issues.

unit_tests/sources/declarative/requesters/test_http_requester.py

[error] 217-270: Ruff formatting check failed. The test file has extra blank lines and spacing inconsistencies.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/requesters/test_http_requester.py (1)

220-243: LGTM!

The helper function correctly sets up a requester that simulates rate-limited responses.

Comment on lines +23 to +25
@property
def stream_slice(self):
return self._stream_slice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add return type annotation to property getter

The property getter needs a return type annotation.

     @property
-    def stream_slice(self):
+    def stream_slice(self) -> Optional[Dict[str, Any]]:
         return self._stream_slice

Don't forget to import the necessary types at the top of the file:

from typing import Optional, Dict, Any
🧰 Tools
🪛 GitHub Actions: Linters

[error] 24-24: mypy: Function is missing a return type annotation. (no-untyped-def)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/request_local/request_local.py around lines
23 to 25, the stream_slice property getter lacks a return type annotation. Add a
return type annotation to the method signature, such as Optional[Dict[str,
Any]], and ensure the necessary types Optional, Dict, and Any are imported from
typing at the top of the file.

Comment on lines +27 to +29
@stream_slice.setter
def stream_slice(self, stream_slice):
self._stream_slice = stream_slice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add type annotation to property setter

The setter needs a type annotation for the parameter.

     @stream_slice.setter
-    def stream_slice(self, stream_slice):
+    def stream_slice(self, stream_slice: Optional[Dict[str, Any]]) -> None:
         self._stream_slice = stream_slice
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@stream_slice.setter
def stream_slice(self, stream_slice):
self._stream_slice = stream_slice
@stream_slice.setter
def stream_slice(self, stream_slice: Optional[Dict[str, Any]]) -> None:
self._stream_slice = stream_slice
🧰 Tools
🪛 GitHub Actions: Linters

[error] 28-28: mypy: Function is missing a type annotation. (no-untyped-def)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/request_local/request_local.py around lines
27 to 29, the stream_slice setter method lacks a type annotation for its
parameter. Add the appropriate type annotation to the stream_slice parameter in
the setter definition to improve code clarity and type checking.

Comment on lines +7 to +13
def __new__(cls, *args, **kwargs):
# Use double-checked locking for thread safety
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(RequestLocal, cls).__new__(cls)
return cls._instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add type annotations to fix mypy errors

The __new__ method is missing type annotations. Would you consider adding them to satisfy the mypy checks? wdyt?

-    def __new__(cls, *args, **kwargs):
+    def __new__(cls, *args, **kwargs) -> "RequestLocal":
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __new__(cls, *args, **kwargs):
# Use double-checked locking for thread safety
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(RequestLocal, cls).__new__(cls)
return cls._instance
def __new__(cls, *args, **kwargs) -> "RequestLocal":
# Use double-checked locking for thread safety
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(RequestLocal, cls).__new__(cls)
return cls._instance
🧰 Tools
🪛 GitHub Actions: Linters

[error] 7-7: mypy: Function is missing a type annotation. (no-untyped-def)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/request_local/request_local.py around lines 7
to 13, the __new__ method lacks type annotations causing mypy errors. Add
appropriate type annotations to the __new__ method signature, specifying the
class type for cls and the return type as an instance of the class, to satisfy
static type checking.

Comment on lines +31 to +39
@classmethod
def get_instance(cls):
"""
Get the singleton instance of RequestLocal.
This is the recommended way to get the instance.
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Add return type annotation and simplify implementation

The class method needs a return type annotation. Also, since __new__ already handles singleton creation with proper locking, would it make sense to simplify this method? wdyt?

     @classmethod
-    def get_instance(cls):
+    def get_instance(cls) -> "RequestLocal":
         """
         Get the singleton instance of RequestLocal.
         This is the recommended way to get the instance.
         """
-        if cls._instance is None:
-            cls._instance = cls()
-        return cls._instance
+        return cls()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@classmethod
def get_instance(cls):
"""
Get the singleton instance of RequestLocal.
This is the recommended way to get the instance.
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def get_instance(cls) -> "RequestLocal":
"""
Get the singleton instance of RequestLocal.
This is the recommended way to get the instance.
"""
return cls()
🧰 Tools
🪛 GitHub Actions: Linters

[error] 32-32: mypy: Function is missing a return type annotation. (no-untyped-def)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/request_local/request_local.py around lines
31 to 39, add a return type annotation to the get_instance class method to
specify it returns an instance of the class. Since the singleton pattern is
already handled in the __new__ method with locking, simplify get_instance by
removing the explicit instance check and creation, and just return cls()
directly.

Comment on lines +15 to +21
def __init__(self):
# __init__ will be called every time the class is instantiated,
# but the object itself is only created once by __new__.
# Use a flag to prevent re-initialization
if not hasattr(self, '_initialized'):
self._stream_slice = None # Initialize _stream_slice
self._initialized = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add return type annotation and consider thread-safe initialization

The __init__ method needs a return type annotation. Also, the initialization check using hasattr might have edge cases in concurrent scenarios. Would you consider using a more robust approach? wdyt?

-    def __init__(self):
+    def __init__(self) -> None:
         # __init__ will be called every time the class is instantiated,
         # but the object itself is only created once by __new__.
         # Use a flag to prevent re-initialization
-        if not hasattr(self, '_initialized'):
+        with self._lock:
+            if not hasattr(self, '_initialized'):
+                self._stream_slice = None  # Initialize _stream_slice
+                self._initialized = True
-            self._stream_slice = None  # Initialize _stream_slice
-            self._initialized = True

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: Linters

[error] 15-15: mypy: Function is missing a return type annotation. Use '-> None' if function does not return a value. (no-untyped-def)

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/request_local/request_local.py around lines
15 to 21, add a return type annotation of None to the __init__ method. To
improve thread safety during initialization, replace the hasattr check with a
thread-safe mechanism such as using a threading.Lock or a class-level flag
protected by synchronization to ensure the initialization code runs only once
even in concurrent scenarios.

Comment on lines +257 to +269
@pytest.mark.usefixtures("mock_sleep")
def test_send_request_rate_limited(caplog):
options_provider = MagicMock()
options_provider.get_request_headers.return_value = {"my_header": "my_value"}
requester = create_requester_rate_limited()
requester._request_options_provider = options_provider
with caplog.at_level(logging.INFO, logger="airbyte"):
with pytest.raises(RateLimitBackoffException):
requester.send_request(stream_slice={"start": "2012"})


logged_messages = [record.message for record in caplog.records]
assert "Caught retryable error 'Too many requests.' after 1 tries. Waiting 1 seconds then retrying for slice: {'start': '2012'}..." in logged_messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove extra blank line and consider more flexible log assertion

There's an extra blank line that should be removed. Also, the exact log message match might be brittle - would you consider using a partial match instead? wdyt?

         with pytest.raises(RateLimitBackoffException):
             requester.send_request(stream_slice={"start": "2012"})
-
-
     logged_messages = [record.message for record in caplog.records]
-    assert "Caught retryable error 'Too many requests.' after 1 tries. Waiting 1 seconds then retrying for slice: {'start': '2012'}..." in logged_messages
+    assert any("Too many requests" in msg and "{'start': '2012'}" in msg for msg in logged_messages)
🤖 Prompt for AI Agents
In unit_tests/sources/declarative/requesters/test_http_requester.py around lines
257 to 269, remove the extra blank line between the with pytest.raises block and
the logged_messages assignment. Change the assertion on the log message to use a
partial match or containment check instead of exact string equality to make the
test less brittle to minor log message changes.

Comment on lines +74 to +137
def test_threading_local_behavior():
"""Test how threading.local affects the singleton"""
def thread_func(thread_name, shared_results, time_sleep):
instance = RequestLocal()
assert instance.stream_slice == None, "Initial stream_slice should be empty"
instance.stream_slice = {f"data_from_{thread_name}": True}

shared_results[thread_name] = {
'instance_id': id(instance),
'stream_slice': instance.stream_slice.copy(),
'thread_id': threading.get_ident()
}

# Check if we can see data from other threads
# this should not happen as RequestLocal is a singleton
time.sleep(time_sleep)
shared_results[f"{thread_name}_after_sleep"] = {
'instance_id': id(instance),
'stream_slice': instance.stream_slice.copy(),
'end_time': time.time(),
}

results = {}
threads = {}
threads_amount = 3
time_sleep = 0.9
thread_names = []
for i in range(threads_amount):
tread_name = f"thread_{i}"
thread_names.append(tread_name)
thread = threading.Thread(target=thread_func, args=(tread_name, results, time_sleep))
time_sleep /=3 # Decrease sleep time for each thread to ensure they overlap
threads[tread_name]= thread
thread.start()

for _, thread in threads.items():
thread.join()

end_times = [results[thread_name + "_after_sleep"]['end_time'] for thread_name in thread_names]
last_end_time = end_times.pop()
while end_times:
current_end_time = end_times.pop()
# Just checking the last thread created ended before the previous ones
# so we could ensure the first thread created that sleep for a longer time
# was not affected by the other threads
assert last_end_time < current_end_time, "End times should be in increasing order"
last_end_time = current_end_time

assert len(thread_names) > 1
assert len(set(thread_names)) == len(thread_names), "Thread names should be unique"
for curren_thread_name in thread_names:
current_thread_name_after_sleep = f"{curren_thread_name}_after_sleep"
assert results[curren_thread_name][STREAM_SLICE_KEY] == results[current_thread_name_after_sleep][STREAM_SLICE_KEY], \
f"Stream slice should remain consistent across thread {curren_thread_name} before and after sleep"
assert results[curren_thread_name][INSTANCE_ID_KEY] == results[current_thread_name_after_sleep][INSTANCE_ID_KEY], \
f"Instance ID should remain consistent across thread {curren_thread_name} before and after sleep"

# Check if stream slices are different across threads
# but same instance ID
for other_tread_name in [thread_name for thread_name in thread_names if thread_name != curren_thread_name]:
assert results[curren_thread_name][STREAM_SLICE_KEY] != results[other_tread_name][STREAM_SLICE_KEY], \
f"Stream slices from different threads should not be the same: {curren_thread_name} vs {other_tread_name}"
assert results[curren_thread_name][INSTANCE_ID_KEY] == results[other_tread_name][INSTANCE_ID_KEY]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix multiple typos and formatting issues

There are several typos and formatting issues that need to be addressed.

         }
-    
+
     results = {}
     threads = {}
     threads_amount = 3
     time_sleep = 0.9
     thread_names = []
     for i in range(threads_amount):
-        tread_name = f"thread_{i}"
-        thread_names.append(tread_name)
-        thread = threading.Thread(target=thread_func, args=(tread_name, results, time_sleep))
-        time_sleep /=3  # Decrease sleep time for each thread to ensure they overlap
-        threads[tread_name]= thread
+        thread_name = f"thread_{i}"
+        thread_names.append(thread_name)
+        thread = threading.Thread(target=thread_func, args=(thread_name, results, time_sleep))
+        time_sleep /= 3  # Decrease sleep time for each thread to ensure they overlap
+        threads[thread_name] = thread
         thread.start()

     for _, thread in threads.items():
         thread.join()

     end_times = [results[thread_name + "_after_sleep"]['end_time'] for thread_name in thread_names]
     last_end_time = end_times.pop()
     while end_times:
         current_end_time = end_times.pop()
         # Just checking the last thread created ended before the previous ones
         # so we could ensure the first thread created that sleep for a longer time
         # was not affected by the other threads
         assert last_end_time < current_end_time, "End times should be in increasing order"
         last_end_time = current_end_time

     assert len(thread_names) > 1
     assert len(set(thread_names)) == len(thread_names), "Thread names should be unique"
-    for curren_thread_name in thread_names:
-        current_thread_name_after_sleep = f"{curren_thread_name}_after_sleep"
-        assert results[curren_thread_name][STREAM_SLICE_KEY] == results[current_thread_name_after_sleep][STREAM_SLICE_KEY], \
-            f"Stream slice should remain consistent across thread {curren_thread_name} before and after sleep"
-        assert results[curren_thread_name][INSTANCE_ID_KEY] == results[current_thread_name_after_sleep][INSTANCE_ID_KEY], \
-            f"Instance ID should remain consistent across thread {curren_thread_name} before and after sleep"
+    for current_thread_name in thread_names:
+        current_thread_name_after_sleep = f"{current_thread_name}_after_sleep"
+        assert results[current_thread_name][STREAM_SLICE_KEY] == results[current_thread_name_after_sleep][STREAM_SLICE_KEY], \
+            f"Stream slice should remain consistent across thread {current_thread_name} before and after sleep"
+        assert results[current_thread_name][INSTANCE_ID_KEY] == results[current_thread_name_after_sleep][INSTANCE_ID_KEY], \
+            f"Instance ID should remain consistent across thread {current_thread_name} before and after sleep"

         # Check if stream slices are different across threads
         # but same instance ID
-        for other_tread_name in [thread_name for thread_name in thread_names if thread_name != curren_thread_name]:
-            assert results[curren_thread_name][STREAM_SLICE_KEY] != results[other_tread_name][STREAM_SLICE_KEY], \
-                f"Stream slices from different threads should not be the same: {curren_thread_name} vs {other_tread_name}"
-            assert results[curren_thread_name][INSTANCE_ID_KEY] == results[other_tread_name][INSTANCE_ID_KEY]
+        for other_thread_name in [thread_name for thread_name in thread_names if thread_name != current_thread_name]:
+            assert results[current_thread_name][STREAM_SLICE_KEY] != results[other_thread_name][STREAM_SLICE_KEY], \
+                f"Stream slices from different threads should not be the same: {current_thread_name} vs {other_thread_name}"
+            assert results[current_thread_name][INSTANCE_ID_KEY] == results[other_thread_name][INSTANCE_ID_KEY]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_threading_local_behavior():
"""Test how threading.local affects the singleton"""
def thread_func(thread_name, shared_results, time_sleep):
instance = RequestLocal()
assert instance.stream_slice == None, "Initial stream_slice should be empty"
instance.stream_slice = {f"data_from_{thread_name}": True}
shared_results[thread_name] = {
'instance_id': id(instance),
'stream_slice': instance.stream_slice.copy(),
'thread_id': threading.get_ident()
}
# Check if we can see data from other threads
# this should not happen as RequestLocal is a singleton
time.sleep(time_sleep)
shared_results[f"{thread_name}_after_sleep"] = {
'instance_id': id(instance),
'stream_slice': instance.stream_slice.copy(),
'end_time': time.time(),
}
results = {}
threads = {}
threads_amount = 3
time_sleep = 0.9
thread_names = []
for i in range(threads_amount):
tread_name = f"thread_{i}"
thread_names.append(tread_name)
thread = threading.Thread(target=thread_func, args=(tread_name, results, time_sleep))
time_sleep /=3 # Decrease sleep time for each thread to ensure they overlap
threads[tread_name]= thread
thread.start()
for _, thread in threads.items():
thread.join()
end_times = [results[thread_name + "_after_sleep"]['end_time'] for thread_name in thread_names]
last_end_time = end_times.pop()
while end_times:
current_end_time = end_times.pop()
# Just checking the last thread created ended before the previous ones
# so we could ensure the first thread created that sleep for a longer time
# was not affected by the other threads
assert last_end_time < current_end_time, "End times should be in increasing order"
last_end_time = current_end_time
assert len(thread_names) > 1
assert len(set(thread_names)) == len(thread_names), "Thread names should be unique"
for curren_thread_name in thread_names:
current_thread_name_after_sleep = f"{curren_thread_name}_after_sleep"
assert results[curren_thread_name][STREAM_SLICE_KEY] == results[current_thread_name_after_sleep][STREAM_SLICE_KEY], \
f"Stream slice should remain consistent across thread {curren_thread_name} before and after sleep"
assert results[curren_thread_name][INSTANCE_ID_KEY] == results[current_thread_name_after_sleep][INSTANCE_ID_KEY], \
f"Instance ID should remain consistent across thread {curren_thread_name} before and after sleep"
# Check if stream slices are different across threads
# but same instance ID
for other_tread_name in [thread_name for thread_name in thread_names if thread_name != curren_thread_name]:
assert results[curren_thread_name][STREAM_SLICE_KEY] != results[other_tread_name][STREAM_SLICE_KEY], \
f"Stream slices from different threads should not be the same: {curren_thread_name} vs {other_tread_name}"
assert results[curren_thread_name][INSTANCE_ID_KEY] == results[other_tread_name][INSTANCE_ID_KEY]
results = {}
threads = {}
threads_amount = 3
time_sleep = 0.9
thread_names = []
for i in range(threads_amount):
thread_name = f"thread_{i}"
thread_names.append(thread_name)
thread = threading.Thread(target=thread_func, args=(thread_name, results, time_sleep))
time_sleep /= 3 # Decrease sleep time for each thread to ensure they overlap
threads[thread_name] = thread
thread.start()
for _, thread in threads.items():
thread.join()
end_times = [results[thread_name + "_after_sleep"]['end_time'] for thread_name in thread_names]
last_end_time = end_times.pop()
while end_times:
current_end_time = end_times.pop()
# Just checking the last thread created ended before the previous ones
# so we could ensure the first thread created that sleep for a longer time
# was not affected by the other threads
assert last_end_time < current_end_time, "End times should be in increasing order"
last_end_time = current_end_time
assert len(thread_names) > 1
assert len(set(thread_names)) == len(thread_names), "Thread names should be unique"
for current_thread_name in thread_names:
current_thread_name_after_sleep = f"{current_thread_name}_after_sleep"
assert results[current_thread_name][STREAM_SLICE_KEY] == results[current_thread_name_after_sleep][STREAM_SLICE_KEY], \
f"Stream slice should remain consistent across thread {current_thread_name} before and after sleep"
assert results[current_thread_name][INSTANCE_ID_KEY] == results[current_thread_name_after_sleep][INSTANCE_ID_KEY], \
f"Instance ID should remain consistent across thread {current_thread_name} before and after sleep"
# Check if stream slices are different across threads
# but same instance ID
for other_thread_name in [thread_name for thread_name in thread_names if thread_name != current_thread_name]:
assert results[current_thread_name][STREAM_SLICE_KEY] != results[other_thread_name][STREAM_SLICE_KEY], \
f"Stream slices from different threads should not be the same: {current_thread_name} vs {other_thread_name}"
assert results[current_thread_name][INSTANCE_ID_KEY] == results[other_thread_name][INSTANCE_ID_KEY]
🤖 Prompt for AI Agents
In unit_tests/sources/declarative/request_local/test_request_local.py between
lines 74 and 137, fix typos such as "tread_name" to "thread_name" and
"curren_thread_name" to "current_thread_name". Also, correct variable names like
"other_tread_name" to "other_thread_name". Ensure consistent formatting and
spacing throughout the function for readability and correctness.

@aldogonzalez8 aldogonzalez8 changed the title feat(rate-limit): add logging with more context feat(rate-limit): add logging with slice information Jul 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants