Consolidate consumer classes and add Avro support#58
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 19 minutes and 50 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughThis PR consolidates SmartConsumer variants into Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant Consumer
participant SerDe as SerDe
participant Schema as Schema Service
participant AMQP as AMQP Broker
App->>Consumer: publish_message(body, properties, no_serialization?, no_encoding?)
alt needs serialization and no_serialization=False
Consumer->>SerDe: determine serializer by content_type
alt content_type == application/vnd.apache.avro.datum
Consumer->>Schema: _load_avro_schema(message_type, schema_uri_format)
Schema-->>Consumer: schema
Consumer->>SerDe: _serialize_avro(body, schema)
else JSON/msgpack/pickle/...
Consumer->>SerDe: serialize(body)
end
else skip serialization
Consumer-->>SerDe: use body as-is
end
alt encoding required and no_encoding=False
Consumer->>SerDe: apply content_encoding (gzip/bzip2)
end
Consumer->>AMQP: basic_publish(final_body, properties)
AMQP->>Consumer: deliver message
App->>Consumer: access .body
alt cached
Consumer-->>App: return cached materialized body
else
Consumer->>SerDe: decode content_encoding if present
SerDe-->>Consumer: decoded_bytes
alt content_type == application/vnd.apache.avro.datum
Consumer->>Schema: _load_avro_schema(message_type, schema_uri_format)
Schema-->>Consumer: schema
Consumer->>SerDe: _deserialize_avro(decoded_bytes, schema)
else JSON/msgpack/pickle/...
Consumer->>SerDe: deserialize(decoded_bytes)
end
Consumer-->>App: return deserialized value (cached)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rejected/consumer.py (1)
1478-1488:⚠️ Potential issue | 🟠 MajorUnsafe YAML loading allows arbitrary code execution.
yaml.load(value)without aLoaderparameter can execute arbitrary Python code if a malicious YAML payload is received. While I see thatS506is intentionally suppressed inpyproject.tomlfor this file, this remains a significant security risk when deserializing untrusted message bodies.Consider using
yaml.safe_load(value)unless you explicitly require loading arbitrary Python objects.🔒 Proposed fix
`@staticmethod` def _load_yaml_value(value): """Load an YAML string into an dict object. :param str value: The YAML string :rtype: any :raises: ConsumerException """ - return yaml.load(value) + return yaml.safe_load(value)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/consumer.py` around lines 1478 - 1488, The _load_yaml_value function currently calls yaml.load(value) which is unsafe for untrusted input; update _load_yaml_value to use yaml.safe_load(value) instead, and ensure any exceptions from parsing are caught and re-raised as the existing ConsumerException (or the surrounding error path) to preserve behavior and error context—refer to the _load_yaml_value function and ConsumerException symbol to locate and modify the implementation.
🧹 Nitpick comments (1)
tests/test_consumer.py (1)
118-122: Remove redundant import.The
import jsonon line 120 is unnecessary sincejsonis already imported at the module level (line 3).♻️ Proposed fix
async def test_body_property(self): await self.run_consumer() # body is auto-deserialized from JSON since content_type is # application/json in mocks.PROPERTIES - import json - self.assertEqual(self.obj.body, json.loads(mocks.BODY))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_consumer.py` around lines 118 - 122, Remove the redundant local "import json" in tests/test_consumer.py and rely on the module-level json import already present at the top of the file; specifically, delete the inline import that appears immediately before the assertion using json.loads(mocks.BODY) so the test continues to use the existing top-level json symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/api_smart_consumer.rst`:
- Around line 1-7: The autodoc directive references the removed class
rejected.consumer.SmartConsumer causing Sphinx to fail; open
docs/api_smart_consumer.rst and either delete the file or replace the autoclass
block (the line containing ".. autoclass:: rejected.consumer.SmartConsumer" and
its :members:/:exclude-members: options) with plain text noting that
SmartConsumer has been removed/deprecated, a brief description of the migration
path or replacement API if any, and remove any remaining Sphinx autodoc
directives that reference SmartConsumer so the docs build no longer imports that
symbol.
In `@rejected/__init__.py`:
- Around line 12-18: The examples.py references a removed class SmartConsumer
from rejected.consumer which will raise AttributeError; update examples.py to
stop inheriting SmartConsumer and instead import/use Consumer (from
rejected.consumer) as the base, or restore a compatibility alias SmartConsumer =
Consumer inside rejected.consumer/__init__.py so examples.py can continue to use
SmartConsumer; locate the inheritance in examples.py and either change the base
class to Consumer and adjust any methods that relied on SmartConsumer-specific
APIs, or add the alias SmartConsumer = Consumer in the rejected.consumer module
to preserve the old name.
In `@rejected/consumer.py`:
- Around line 1613-1625: The HTTP schema fetch in the conditional that checks
uri.startswith(('http://', 'https://')) uses _requests.get(uri) with no timeout,
which can hang; fix by adding a sensible timeout argument (e.g., timeout=5 or a
configurable REQUEST_TIMEOUT value) to the _requests.get call in that block (and
surface or document the constant if you add one), and ensure any request
errors/timeouts are still translated into a ConsumerException (the existing
raising behavior around response.ok and the ConsumerException for failures
should remain intact).
---
Outside diff comments:
In `@rejected/consumer.py`:
- Around line 1478-1488: The _load_yaml_value function currently calls
yaml.load(value) which is unsafe for untrusted input; update _load_yaml_value to
use yaml.safe_load(value) instead, and ensure any exceptions from parsing are
caught and re-raised as the existing ConsumerException (or the surrounding error
path) to preserve behavior and error context—refer to the _load_yaml_value
function and ConsumerException symbol to locate and modify the implementation.
---
Nitpick comments:
In `@tests/test_consumer.py`:
- Around line 118-122: Remove the redundant local "import json" in
tests/test_consumer.py and rely on the module-level json import already present
at the top of the file; specifically, delete the inline import that appears
immediately before the assertion using json.loads(mocks.BODY) so the test
continues to use the existing top-level json symbol.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cee61237-76cc-47ae-b245-9c9137e614d5
📒 Files selected for processing (10)
docs/api.mddocs/api_consumer.rstdocs/api_smart_consumer.rstdocs/consumer_howto.mdpyproject.tomlrejected/__init__.pyrejected/config.pyrejected/consumer.pytests/test_consumer.pytests/test_testing.py
💤 Files with no reviewable changes (2)
- docs/api.md
- docs/api_consumer.rst
|
🤖 This comment was posted by Claude on behalf of @gmr PR Monitor SummaryResponse to CodeRabbit ReviewAll three CodeRabbit comments addressed and resolved:
All 225 tests pass locally. |
Move all SmartConsumer functionality (auto-deserializing body property, auto-serializing/encoding publish_message, and all codec helpers) directly into Consumer. Replace SmartConsumer, PublishingConsumer, and SmartPublishingConsumer with deprecated aliases that warn and delegate to Consumer. Update docs and fix the test_body_property assertion to expect the deserialized body now that Consumer auto-deserializes JSON. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Incorporates the avroconsumer library directly into rejected, so Avro support is available without an external package. Avro is gated behind `rejected[avro]` (fastavro>=1.7, requests>=2.28) and fully inert when fastavro is not installed — existing consumers are unaffected. Changes to Consumer: - Detects content-type 'application/vnd.apache.avro.datum' in the body property and auto-deserializes via fastavro when fastavro is available and self.message_type is set - Detects same content-type + 'type' property in publish_message and auto-serializes via fastavro - _avro_schemas: dict caches parsed schemas per message_type - _avro_schema(message_type) — returns cached schema, calling _load_avro_schema on first access - _load_avro_schema(message_type) — raises NotImplementedError; must be overridden or use the helper subclasses below - _deserialize_avro / _serialize_avro — fastavro schemaless reader/writer New classes: - LocalSchemaConsumer — loads .avsc files from a configured schema_path - RemoteSchemaConsumer — fetches schemas via HTTP from schema_uri_format AVRO_DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' exported from the top-level rejected package. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Removes LocalSchemaConsumer and RemoteSchemaConsumer. Consumer._load_avro_schema
now dispatches on the scheme of schema_uri_format:
file:///path/to/schemas/{0}.avsc -> read from disk
http(s)://registry/{0}.avsc -> HTTP GET
If schema_uri_format is not configured, NotImplementedError is raised,
requiring the consumer to override _load_avro_schema directly.
This is one config key, one code path, no subclasses needed.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…onsumer These have been deprecated aliases for Consumer since 3.17 / 4.0. Removing them as part of the 4.0 release — use Consumer directly. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
It's a framework concern, not an application setting, so it belongs
alongside sentry_dsn, queue, qos_prefetch etc. rather than inside
the config: sub-section.
Config change:
# Before
config:
schema_uri_format: file:///etc/schemas/{0}.avsc
# After
schema_uri_format: file:///etc/schemas/{0}.avsc
Added schema_uri_format: str | None to ConsumerConfig. Consumer reads
it via self._process.consumer_config.schema_uri_format.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Bugs:
- body property used falsy check (if self._message_body:) which
re-deserialized on every access when the body was 0, "", [], {}, or
False; replaced with a sentinel pattern (_UNSET) so any value
including falsy ones is cached correctly
- _auto_serialize YAML branch checked self.content_type (incoming
message) instead of the content_type parameter (outgoing); fixed
- _auto_serialize BS4 branch used inline ('text/html', 'text/xml')
instead of the BS4_MIME_TYPES constant
- _republish_dropped_message logged "Republishing due to
ProcessingException" — wrong method; fixed to include the reason
Quality:
- Removed unnecessary comments narrating WHAT in the body property
- Moved pathlib import to module level (no reason for lazy import)
- Fixed TOCTOU in file:// schema loading — removed exists() check,
catch FileNotFoundError directly
- Deduplicated properties.get('content_encoding') in publish_message
- Added warning log when Avro datum received without message_type
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Remove docs/api_smart_consumer.rst referencing deleted SmartConsumer - Update docs/consumer.rst to reflect consolidated Consumer class - Update examples.py to use Consumer instead of SmartConsumer - Add timeout=30 to requests.get() call in Avro schema loading Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
026e980 to
89bc0cc
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rejected/consumer.py (2)
1478-1487:⚠️ Potential issue | 🟠 Major
yaml.load()withoutLoaderis deprecated and unsafe.Using
yaml.load(value)without specifying aLoaderis deprecated since PyYAML 5.1 and allows arbitrary code execution via crafted YAML payloads. Since message bodies come from external sources, this is a security risk.🛡️ Proposed fix to use safe_load
`@staticmethod` def _load_yaml_value(value): """Load an YAML string into an dict object. :param str value: The YAML string :rtype: any - :raises: ConsumerException """ - return yaml.load(value) + return yaml.safe_load(value)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/consumer.py` around lines 1478 - 1487, The _load_yaml_value static method currently calls yaml.load(value) which is unsafe; replace that call with yaml.safe_load(value) (or yaml.load(value, Loader=yaml.SafeLoader)) to avoid arbitrary code execution when parsing untrusted message bodies, keeping the same return type and propagating ConsumerException as before if parsing fails.
1390-1403:⚠️ Potential issue | 🟡 MinorBeautifulSoup instantiation without explicit parser triggers warnings.
bs4.BeautifulSoup(value)without a parser argument causes aGuessedAtParserWarningand may behave inconsistently across systems depending on installed parsers.🔧 Proposed fix to specify parser
- return bs4.BeautifulSoup(value) + return bs4.BeautifulSoup(value, 'html.parser')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rejected/consumer.py` around lines 1390 - 1403, The _load_bs4_value function instantiates BeautifulSoup without specifying a parser which triggers GuessedAtParserWarning; update the bs4.BeautifulSoup call inside _load_bs4_value to pass an explicit parser (for example "html.parser" or "lxml")—i.e., change the instantiation in _load_bs4_value to bs4.BeautifulSoup(value, "html.parser") (or use "lxml" if your environment requires it) so the parser choice is explicit and warnings are avoided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@rejected/consumer.py`:
- Around line 1478-1487: The _load_yaml_value static method currently calls
yaml.load(value) which is unsafe; replace that call with yaml.safe_load(value)
(or yaml.load(value, Loader=yaml.SafeLoader)) to avoid arbitrary code execution
when parsing untrusted message bodies, keeping the same return type and
propagating ConsumerException as before if parsing fails.
- Around line 1390-1403: The _load_bs4_value function instantiates BeautifulSoup
without specifying a parser which triggers GuessedAtParserWarning; update the
bs4.BeautifulSoup call inside _load_bs4_value to pass an explicit parser (for
example "html.parser" or "lxml")—i.e., change the instantiation in
_load_bs4_value to bs4.BeautifulSoup(value, "html.parser") (or use "lxml" if
your environment requires it) so the parser choice is explicit and warnings are
avoided.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2525066b-4250-4823-b44c-289aeadd7265
📒 Files selected for processing (4)
docs/api_smart_consumer.rstdocs/consumer.rstexamples.pyrejected/consumer.py
💤 Files with no reviewable changes (1)
- docs/api_smart_consumer.rst
✅ Files skipped from review due to trivial changes (1)
- docs/consumer.rst
- yaml.load(value) → yaml.safe_load(value) to prevent arbitrary code execution when deserializing untrusted message bodies - bs4.BeautifulSoup(value) → bs4.BeautifulSoup(value, 'html.parser') to avoid GuessedAtParserWarning and make the parser choice explicit Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_consumer.py (1)
118-122: Remove redundant inline import ofjson.The
jsonmodule is already imported at the top of the file (line 3). The inline import inside the test method is unnecessary.♻️ Proposed fix
async def test_body_property(self): await self.run_consumer() # body is auto-deserialized from JSON since content_type is # application/json in mocks.PROPERTIES - import json - self.assertEqual(self.obj.body, json.loads(mocks.BODY))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_consumer.py` around lines 118 - 122, Remove the redundant inline import of the json module inside the test; since json is already imported at the top of tests/test_consumer.py, delete the local "import json" in the test method and leave the assertion self.assertEqual(self.obj.body, json.loads(mocks.BODY)) unchanged (look for the test referencing self.obj.body and mocks.BODY to find the spot).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rejected/consumer.py`:
- Line 1487: Replace the unsafe yaml.load(value) call with a safe loader to
avoid arbitrary code execution: locate the yaml.load(value) invocation in
consumer.py and change it to use yaml.safe_load(value) (or yaml.load(value,
Loader=yaml.SafeLoader)) so untrusted message bodies are parsed safely; keep the
rest of the function logic unchanged.
---
Nitpick comments:
In `@tests/test_consumer.py`:
- Around line 118-122: Remove the redundant inline import of the json module
inside the test; since json is already imported at the top of
tests/test_consumer.py, delete the local "import json" in the test method and
leave the assertion self.assertEqual(self.obj.body, json.loads(mocks.BODY))
unchanged (look for the test referencing self.obj.body and mocks.BODY to find
the spot).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2665e2a7-f116-4d6c-b8af-6c7645088a6d
📒 Files selected for processing (12)
docs/api.mddocs/api_consumer.rstdocs/api_smart_consumer.rstdocs/consumer.rstdocs/consumer_howto.mdexamples.pypyproject.tomlrejected/__init__.pyrejected/config.pyrejected/consumer.pytests/test_consumer.pytests/test_testing.py
💤 Files with no reviewable changes (3)
- docs/api.md
- docs/api_consumer.rst
- docs/api_smart_consumer.rst
✅ Files skipped from review due to trivial changes (3)
- rejected/config.py
- pyproject.toml
- docs/consumer.rst
🚧 Files skipped from review as they are similar to previous changes (3)
- examples.py
- tests/test_testing.py
- docs/consumer_howto.md
Summary
Consolidates four consumer classes into one unified
Consumer, removes deprecated aliases as part of the 4.0 release, and absorbs theavroconsumerlibrary so Avro datum support is available without an external dependency.Problem
The
Consumer/SmartConsumer/PublishingConsumer/SmartPublishingConsumerhierarchy was confusing —Publishing*variants had been deprecated stubs since 3.17, andSmartConsumerwas the only class worth using but required knowing to extend it instead ofConsumer. Theavroconsumerlibrary also lived as a separate package, requiring users to know about it and install it separately.Solution
Consumer consolidation:
SmartConsumer's auto-serialize/deserializebodyproperty andpublish_messageoverride merged directly intoConsumer— all consumers now get content-type-aware body decoding and publishing for freeSmartConsumer,PublishingConsumer,SmartPublishingConsumerremoved entirely (4.0 break)Avro support (
rejected[avro]):fastavro>=1.7+requests>=2.28added as an optional extrafastavrois not installed the avro code paths are completely inertbodyauto-deserializes Avro datums whencontent-type: application/vnd.apache.avro.datumandmessage_typeis setpublish_messageauto-serializes to Avro when the same content-type +typeproperty are present_avro_schemasdict caches parsed schemas per message type_load_avro_schema(message_type)dispatches on theschema_uri_formatconsumer config key:file:///path/to/schemas/{0}.avsc→ read from diskhttp(s)://registry/{0}.avsc→ HTTP GETNotImplementedError(consumer must override_load_avro_schema)schema_uri_formatis a top-level consumer config key (alongsidequeue,sentry_dsn) not insideconfig:AVRO_DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'exported from the top-level packageImpact
Breaking changes (4.0):
SmartConsumer,PublishingConsumer,SmartPublishingConsumerremoved — useConsumerConsumer.bodynow auto-deserializes by content-type (was raw bytes) — consumers that accessedself.bodyexpecting raw bytes and handled deserialization themselves should switch toself._message.bodyAll 225 tests pass.
Summary by CodeRabbit
Breaking Changes
New Features
Documentation