Skip to content

4.0: Codec class, consolidated models, BaseConsumer + TransactionConsumer#60

Merged
gmr merged 22 commits into
mainfrom
functional-consumer
Apr 3, 2026
Merged

4.0: Codec class, consolidated models, BaseConsumer + TransactionConsumer#60
gmr merged 22 commits into
mainfrom
functional-consumer

Conversation

@gmr
Copy link
Copy Markdown
Owner

@gmr gmr commented Apr 2, 2026

Summary

Major restructuring for rejected 4.0 — async Codec class, consolidated pydantic models, consumer hierarchy with backward-compat Consumer and concurrent TransactionConsumer.

Changes

Codec class (rejected/codecs.py)

  • Codec(schema_registry=None) — async encode/decode with content-type dispatch + Avro
  • SchemaRegistryConfig(type: Literal['file', 'http'], uri: str) replaces per-consumer schema_uri_format
  • httpx for async HTTP schema fetching (replaces requests)
  • Thread-safe schema cache with asyncio.Lock (double-check pattern)

Models (rejected/models.py)

  • All pydantic models consolidated: config, message, processing context
  • ProcessingContext carries connection, channel, raw_body, message, measurement, result
  • Message is flat (no nested properties/method objects)
  • Result IntEnum for message disposition
  • SchemaRegistryConfig with Literal type validation

Consumer hierarchy (rejected/consumer.py)

  • _Consumer: base contract with execute(ctx)_pre_execute_run_consumer
  • Consumer(_Consumer): backward-compat locked single-message, self._context for property access
  • TransactionConsumer(_Consumer): concurrent, receives ProcessingContext in prepare(ctx)/process(ctx)/finish(ctx)
  • publish_message is async, uses process Codec
  • _republish helper deduplicates dropped/error republishing
  • raw_body preserved for republishing (not decoded body)

Process (rejected/process.py)

  • Concurrent: no lock, no pending queue, _in_flight dict tracks ProcessingContext per delivery_tag
  • Counters replaced by per-message Measurement tags + cumulative aggregation
  • Async body decoding via Codec in invoke_consumer
  • String connections handled correctly (not silently skipped)

Type annotations

  • mypy strict on all modules
  • Full annotations on every file

Other

  • exceptions.py: standalone module
  • measurement.py: renamed from data.py
  • avro.py: deleted (merged into Codec)
  • httpx>=0.27 replaces requests in [avro] extra
  • ruff: skip-magic-trailing-comma, isort compat

Testing

208 tests pass. ruff clean. mypy clean on all modules.

Summary by CodeRabbit

  • New Features

    • Transaction-style concurrent consumer, rich async codec (JSON/msgpack/Avro/CSV/YAML/HTML) and per-message instrumentation.
  • Documentation

    • README streamlined (async consumer example removed/updated; optional Avro install hint added; coverage badge removed); legacy docs and examples deleted.
  • Refactor

    • Large internal rewrite for clearer APIs, structured message/context models, and stronger typing.
  • Chores

    • LICENSE year bumped to 2026; project tooling and type-checker configurations tightened.

gmr and others added 15 commits April 1, 2026 21:23
- Move all serialization/deserialization logic into rejected/codecs.py
  with stateless decode/encode/decode_avro/encode_avro functions
- Rename message.py to models.py for the pydantic Message model
- Split Consumer into BaseConsumer (framework plumbing) and Consumer
  (thin public API subclass preserving backward compatibility)
- Add FunctionalConsumer that passes a Message model to
  prepare/process/finish instead of using self.body/self.content_type
- Factor execute() into _pre_execute + _handle_execution + _run_consumer
  to share setup and error handling between Consumer and FunctionalConsumer

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
All pydantic models (Config, ConnectionConfig, ConnectionRef,
ConsumerConfig, PrometheusConfig, StatsdConfig, StatsConfig) moved
from config.py to models.py alongside the Message model.

config.py retains the Settings wrapper class, the load() function,
and re-exports all model classes for backward compatibility so
existing `config_module.Config` references continue to work.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Set skip-magic-trailing-comma = true and isort.split-on-trailing-comma
= false so ruff stops adding trailing commas everywhere. Reformatted
all files with the new setting.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Result: IntEnum replacing module-level int constants (MESSAGE_ACK,
  MESSAGE_DROP, etc.) — backward-compat re-exports in data.py
- Properties: pydantic model with from_pika() class method replacing
  the old __slots__-based Data subclass
- InternalMessage: pydantic model with from_pika() class method
  replacing the old Message Data subclass
- data.Message() and data.Properties() remain as backward-compatible
  factory functions that delegate to from_pika()
- Measurement stays in data.py (mutable state, not a model)
- Fixed fastavro references in consumer.py (codecs.fastavro)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Annotate class attributes with ClassVar (HB_INTERVAL, STATE_CLOSED)
- Fix on_open_error/on_closed: proper pika callback signatures instead
  of bare *args/**kwargs
- Fix on_blocked/on_unblocked: use frame.Method[T] wrapper types
- Fix on_consumer_cancelled/on_confirmation/on_qos_set: frame.Method[T]
- Fix on_channel_closed: (Channel, Exception) per pika stubs, with
  isinstance check for ChannelClosed to extract reply_code/reply_text
- Fix on_return: Channel type instead of int
- Fix on_confirmation body: access method.method for the inner spec
- Add on_channel_open return type annotation
- Add explicit return None in _ssl_options
- Add type: ignore for pika stub quirks on blocked/unblocked callbacks
- Import pika.frame for Method wrapper type

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
pika doesn't ship py.typed or inline type stubs. The pika-stubs package
provides type info for pyright/basedpyright but mypy needs an explicit
override to suppress import-untyped errors.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
state.py: All methods, properties, __init__, and class attributes now
fully annotated. Fixes mypy no-untyped-call errors in connection.py.

connection.py: Restored type: ignore[arg-type] comments for three pika
stub mismatches that mypy also flags (blocked/unblocked callback
expects base Connection, ssl_options expects _DEFAULT sentinel).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
codecs.py:
- All functions fully annotated (params and return types)
- Removed legacy plistlib compat (Python 3.11+ always has dumps/loads)
- Fixed DictReader generic type arg (DictReader[str])
- Removed unused type: ignore comments on optional imports (mypy
  ignores those modules via overrides)

pyproject.toml:
- Added bs4, umsgpack, fastavro to mypy ignore_missing_imports
- Added types-PyYAML and mypy to dev deps

connection.py:
- Restored type: ignore[arg-type] on 3 pika stub mismatches (needed
  because types-pika provides stubs that mypy DOES check)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
log.py:
- CorrelationFilter.__init__ and filter fully annotated
- CorrelationAdapter with LoggerAdapter[Logger] generic param
- __init__ and process fully annotated

models.py:
- Callbacks fields annotated as Callable[..., Any]
- Message.connection and channel typed as Any (avoids circular import)
- headers dict fully parameterized
- Removed unused pika.channel TYPE_CHECKING import

pyproject.toml:
- Added yaml to mypy ignore_missing_imports (types-PyYAML not always
  available in all environments)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Fully annotated for mypy strict mode:
- state.py: all methods, properties, ClassVar attributes
- connection.py: pika callback signatures with frame.Method[T],
  type: ignore[arg-type] for 3 pika stub quirks
- codecs.py: all functions with param/return types, removed legacy
  plistlib compat
- log.py: CorrelationFilter, CorrelationAdapter with LoggerAdapter
  generic param
- models.py: Callbacks with Callable[..., Any], Message fields typed
- exceptions.py: RejectedException.__init__/*args/**kwargs annotated,
  subclasses cleaned up (redundant __init__ removed)
- utils.py: all functions annotated, str() wrapper on getattr return
- measurement.py: Counter[str], dict type params, Generator return
- config.py: Settings with full dict type params, ItemsView/KeysView
- controller.py: run/signal handlers with FrameType, _mcp typed as
  MasterControlProgram | None
- mcp.py: all 30+ methods annotated, ClassVar constants, signal
  handlers with FrameType, typed instance attributes
- statsd.py: Client fully annotated including Callable for callback
- mixins.py: GarbageCollectorMixin with ClassVar, hasattr guard for
  super().on_finish()
- prometheus.py: dict type params, typing import added

pyproject.toml:
- mypy overrides for all untyped third-party modules
- types-PyYAML and mypy added to dev deps

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Major restructuring for rejected 4.0. See PR description for details.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces legacy data/config/docs with Pydantic models and typed APIs, extracts async Codec and Connection modules, restructures Consumer/Process around ProcessingContext and TransactionConsumer, removes Sphinx docs/examples, and adds extensive typing, Avro/httpx support, and updated tests.

Changes

Cohort / File(s) Summary
License & README
LICENSE, README.md
Bumped LICENSE year to 2009–2026; removed Codecov badge; README consumer example made async (async def process(self) -> None) and references self.body; added optional Avro install note; removed Async Consumer & Version History sections.
Docs removal
docs/Makefile, docs/conf.py, docs/configuration.rst, docs/consumer_howto.rst, docs/history.rst, docs/index.rst
Entire Sphinx build/config and multiple documentation pages removed.
Project tooling
pyproject.toml
Adjusted optional deps (Avro now uses httpx), added mypy/type stubs to dev group, and updated Ruff/isort/mypy settings and per-file ignores.
Core models & metrics
rejected/models.py, rejected/measurement.py
Added Pydantic models (Config, ConnectionConfig, ConsumerConfig, Message, ProcessingContext, Result enum) and Measurement for per-message instrumentation.
Codec/Serialization
rejected/codecs.py
New async Codec with encode/decode, compression (gzip/bzip2), many content-types (JSON/msgpack/pickle/plist/csv/html/yaml), Avro schemaless support with async schema loading/cache via httpx, and EncodeError/DecodeError.
Connection layer
rejected/connection.py
New Connection class wrapping pika.AsyncioConnection: lifecycle callbacks, channel management, QoS/consume, publisher confirms, blocked/unblocked, shutdown, and SSL/params helpers.
Consumer & processing refactor
rejected/consumer.py, rejected/process.py
Reworked consumer into _Consumer/TransactionConsumer/Consumer using ProcessingContext; moved serialization to Codec; Process now tracks in-flight contexts, decodes via Codec, and unifies ack/nack/requeue flows via Result enum.
Exceptions & compatibility exports
rejected/exceptions.py, rejected/__init__.py
New RejectedException hierarchy and exception subclasses; init re-exports Message, Result, TransactionConsumer, and imports AVRO_DATUM_MIME_TYPE from codecs.
Config & settings
rejected/config.py
Replaced local Pydantic defs with re-exports from rejected.models; Settings typed; load() returns models.Config.
Removed examples & scripts
examples.py, utils/test_generator.py
Deleted example consumers and the test message generator script.
Typing & utils sweep
rejected/controller.py, rejected/log.py, rejected/mcp.py, rejected/mixins.py, rejected/state.py, rejected/statsd.py, rejected/prometheus.py, rejected/utils.py
Widespread typing additions, signature refinements, small guards (PID handling, on_finish), and minor refactors.
Tests
tests/...
Updated tests to use Pydantic models, ProcessingContext, Measurement, new exceptions, and new async signatures; many helpers/adapters adjusted accordingly.
Removed legacy data
rejected/data.py
Deleted legacy data module (Message/Properties/Measurement constants) in favor of models/measurement.
New modules to audit closely
rejected/codecs.py, rejected/connection.py, rejected/consumer.py, rejected/process.py, rejected/models.py
Significant new logic, async flows, Avro/schema fetching, and runtime behavior changes — review for correctness, concurrency, error handling, and resource cleanup.

Sequence Diagram(s)

sequenceDiagram
    participant Broker as Message Broker
    participant Conn as Connection
    participant Proc as Process
    participant Codec as Codec
    participant Consumer as Consumer
    participant Ctx as ProcessingContext

    Broker->>Conn: deliver(method, properties, body)
    Conn->>Proc: on_delivery(method, properties, body)
    Proc->>Proc: create ProcessingContext (ctx)
    Proc->>Codec: decode(body, content_type, content_encoding, message_type)
    Codec-->>Proc: decoded_payload
    Proc->>Consumer: execute(ctx)
    Consumer->>Consumer: prepare(ctx) / process(ctx) / finish(ctx)
    alt success
        Consumer-->>Proc: Result.MESSAGE_ACK
    else error
        Consumer-->>Proc: Result.* (CONSUMER_/MESSAGE_/PROCESSING_/UNHANDLED)
    end
    Proc->>Conn: ack/nack/publish based on Result
    Conn->>Broker: basic_ack/basic_nack/basic_publish
Loading
sequenceDiagram
    participant App as CLI/App
    participant Ctrl as Controller
    participant MCP as MasterControlProgram
    participant Proc as Process
    participant Conn as Connection
    participant RMQ as RabbitMQ

    App->>Ctrl: start(args, config)
    Ctrl->>MCP: init(config)
    MCP->>Proc: spawn processes for consumers
    Proc->>Conn: Connection(config)
    Conn->>RMQ: establish AsyncioConnection
    RMQ-->>Conn: on_open -> channel created
    Conn->>RMQ: basic_consume(queue)
    RMQ-->>Proc: message delivery
    Proc->>Proc: invoke_consumer(ctx)
    Proc->>RMQ: ack/nack/publish
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Poem

🐇 I nibble bytes and hop through queues,

Schemas snug in burrows new,
Async paws and codecs hum,
Connections leap, consumers drum,
A tidy warren, built for two.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.12% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main changes: introduction of a Codec class, consolidated models, and a refactored Consumer hierarchy (BaseConsumer/TransactionConsumer) for version 4.0.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch functional-consumer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rejected/utils.py (1)

55-56: ⚠️ Potential issue | 🟡 Minor

Validate percentile bounds before indexing.

k is currently unbounded; k == 0 returns the last element and k > 100 can raise IndexError.

Proposed fix
 def percentile(values: list[float], k: int) -> float | None:
@@
     if not values:
         return None
+    if not 0 < k <= 100:
+        raise ValueError('k must be in the range 1..100')
     values.sort()
     index = (len(values) * (float(k) / 100)) - 1
     return values[math.ceil(index)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/utils.py` around lines 55 - 56, Validate percentile input k before
computing index: ensure k is within 1..100 (or explicitly define allowed range)
and raise a clear ValueError if not; after computing index = (len(values) *
(float(k) / 100)) - 1, clamp the resulting index into the valid range [0,
len(values)-1] before calling math.ceil and indexing into values to avoid k==0
returning the last element and k>100 causing IndexError. Refer to the local
variables index, values, and k in this fix.
🧹 Nitpick comments (2)
tests/test_data.py (1)

46-50: Add the exception-path check for track_duration.

Measurement.track_duration() appends in a finally block, but this suite
only exercises the happy path. A small assertRaises case here would lock in
the behavior that timings are still recorded when the wrapped block fails.

Proposed test addition
     def test_track_duration(self):
         with self.m.track_duration('timed'):
             pass
         self.assertEqual(len(self.m.durations['timed']), 1)
         self.assertGreaterEqual(self.m.durations['timed'][0], 0)
+
+    def test_track_duration_on_exception(self):
+        with self.assertRaises(RuntimeError):
+            with self.m.track_duration('timed'):
+                raise RuntimeError('boom')
+        self.assertEqual(len(self.m.durations['timed']), 1)
+        self.assertGreaterEqual(self.m.durations['timed'][0], 0)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_data.py` around lines 46 - 50, Extend the existing
test_track_duration in tests/test_data.py to also exercise the exception path:
within an assertRaises (e.g., self.assertRaises(ValueError)) use
self.m.track_duration('timed') as a context manager that raises inside the
block, then after the exception assert that self.m.durations['timed'] length
increased (or contains an entry) and that the recorded duration is >= 0, thereby
verifying Measurement.track_duration's finally block behavior; locate the test
by the test_track_duration method and the code under test by
Measurement.track_duration and self.m.durations.
pyproject.toml (1)

133-137: Clarify the Ruff glob pattern for repository-wide suppression.

The pattern "*.py" = ["S101", "S301"] is ambiguous in Ruff. Without a pinned version, it may behave unexpectedly—prior to v0.5.0, *.py matched recursively; v0.5.0+ uses gitignore semantics where * doesn't cross directories. Since your codebase uses explicit **/*.py for other directories (tests/**/*.py, utils/**/*.py), use **/*.py here as well for clarity and consistent behavior across Ruff versions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pyproject.toml` around lines 133 - 137, The Ruff suppression pattern `"*.py"
= ["S101", "S301"]` is ambiguous across Ruff versions; update that key to use a
recursive glob `"**/*.py"` so it matches files across all directories
consistently (matching how `tests/**/*.py` and `utils/**/*.py` are used) and
replace the `"*.py"` entry with `"**/*.py"` in pyproject.toml to ensure
repository-wide suppression behaves the same regardless of Ruff version.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@README.md`:
- Around line 44-53: The README example uses the wrong Consumer.process
signature — change the method on class Test (class Test(consumer.Consumer)) from
async def process(self, message: models.Message) to the no-arg async def
process(self) and update the body to reference the incoming message via the
consumer instance (e.g. use self.message.body) so the example matches the
consumer.Consumer API used in tests; if you intended an explicit message
parameter instead, switch to TransactionConsumer in the example.

In `@rejected/codecs.py`:
- Around line 113-114: The block that calls pickle.loads when content_type is in
PICKLE_MIME_TYPES is unsafe for untrusted data; change it to only use
pickle.loads behind an explicit, configurable opt-in (e.g., an
ENABLE_PICKLE_DESERIALIZATION boolean or runtime setting) and fail with a clear
error/log if deserialization is requested but the flag is false, document the
trust assumption for PICKLE_MIME_TYPES, and wrap the pickle.loads call in a
try/except to log and rethrow deserialization errors; alternatively implement or
call a safe deserializer (JSON/msgpack) for untrusted sources and reserve
pickle.loads for trusted/internal flows only.

In `@rejected/connection.py`:
- Around line 327-378: The docstring in the _ssl_options property contains a
typo ("prototcol"); update the expected ssl_options list to spell "protocol"
correctly in the docstring for the _ssl_options method (referencing the
_ssl_options property and config.ssl_options) so the documentation is accurate.

In `@rejected/exceptions.py`:
- Around line 26-45: The exception class's __init__/__str__/__repr__ currently
stores non-string args in self.value and then calls .format() in __str__, which
can raise a second exception; update __init__ to ensure self.value is always a
string (coerce args[0] to str when used as the default value) and/or change
__str__ to only call self.value.format(...) when isinstance(self.value, str)
(otherwise return str(self.value)); reference the __init__, __str__, __repr__,
self.value and self.args symbols when making this safe-change so formatting
never attempts to call .format on a non-string.

---

Outside diff comments:
In `@rejected/utils.py`:
- Around line 55-56: Validate percentile input k before computing index: ensure
k is within 1..100 (or explicitly define allowed range) and raise a clear
ValueError if not; after computing index = (len(values) * (float(k) / 100)) - 1,
clamp the resulting index into the valid range [0, len(values)-1] before calling
math.ceil and indexing into values to avoid k==0 returning the last element and
k>100 causing IndexError. Refer to the local variables index, values, and k in
this fix.

---

Nitpick comments:
In `@pyproject.toml`:
- Around line 133-137: The Ruff suppression pattern `"*.py" = ["S101", "S301"]`
is ambiguous across Ruff versions; update that key to use a recursive glob
`"**/*.py"` so it matches files across all directories consistently (matching
how `tests/**/*.py` and `utils/**/*.py` are used) and replace the `"*.py"` entry
with `"**/*.py"` in pyproject.toml to ensure repository-wide suppression behaves
the same regardless of Ruff version.

In `@tests/test_data.py`:
- Around line 46-50: Extend the existing test_track_duration in
tests/test_data.py to also exercise the exception path: within an assertRaises
(e.g., self.assertRaises(ValueError)) use self.m.track_duration('timed') as a
context manager that raises inside the block, then after the exception assert
that self.m.durations['timed'] length increased (or contains an entry) and that
the recorded duration is >= 0, thereby verifying Measurement.track_duration's
finally block behavior; locate the test by the test_track_duration method and
the code under test by Measurement.track_duration and self.m.durations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0780a2f3-6424-464c-8fd7-e4d9fbf5b16b

📥 Commits

Reviewing files that changed from the base of the PR and between 8478c39 and c387251.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (37)
  • LICENSE
  • README.md
  • docs/Makefile
  • docs/conf.py
  • docs/configuration.rst
  • docs/consumer_howto.rst
  • docs/history.rst
  • docs/index.rst
  • examples.py
  • pyproject.toml
  • rejected/__init__.py
  • rejected/codecs.py
  • rejected/config.py
  • rejected/connection.py
  • rejected/consumer.py
  • rejected/controller.py
  • rejected/data.py
  • rejected/exceptions.py
  • rejected/log.py
  • rejected/mcp.py
  • rejected/measurement.py
  • rejected/mixins.py
  • rejected/models.py
  • rejected/process.py
  • rejected/prometheus.py
  • rejected/state.py
  • rejected/statsd.py
  • rejected/testing.py
  • rejected/utils.py
  • tests/test_consumer.py
  • tests/test_controller.py
  • tests/test_data.py
  • tests/test_mcp.py
  • tests/test_process.py
  • tests/test_statsd.py
  • tests/test_testing.py
  • utils/test_generator.py
💤 Files with no reviewable changes (9)
  • docs/index.rst
  • docs/conf.py
  • docs/history.rst
  • docs/configuration.rst
  • examples.py
  • docs/consumer_howto.rst
  • docs/Makefile
  • utils/test_generator.py
  • rejected/data.py

Comment thread README.md Outdated
Comment thread rejected/codecs.py Outdated
Comment thread rejected/connection.py Outdated
Comment thread rejected/exceptions.py
- Regenerate uv.lock without pypi.aweber.io URLs that caused
  DNS failures in GitHub Actions
- Fix README example to use correct Consumer.process() signature
  (no-arg, access message via self.body)
- Fix typo in connection.py docstring: prototcol -> protocol
- Coerce exception value to str in RejectedException.__init__
  to prevent .format() from failing on non-string args

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 2, 2026

PR Monitor Summary

CI/CD Fixes

  • Regenerated uv.lock without pypi.aweber.io URLs that caused DNS resolution failures in all 4 GitHub Actions test jobs (3.11, 3.12, 3.13, 3.14)

CodeRabbit Review Feedback

  • README.md: Fixed Consumer.process() example to use correct no-arg signature with self.body access
  • connection.py: Fixed typo in docstring: "prototcol" -> "protocol"
  • exceptions.py: Coerced self.value to str in RejectedException.__init__ to prevent .format() from failing on non-string args
  • codecs.py (pickle.loads): Replied explaining this is an intentional design choice for trusted internal RabbitMQ consumers -- not changing behavior

All 4 review threads have been resolved. Waiting for CI re-run and CodeRabbit re-review.

gmr and others added 2 commits April 2, 2026 19:42
Add `from __future__ import annotations` to connection.py so that
pika.frame.Method[pika.spec.Connection.Blocked] and similar type
annotations are treated as strings rather than evaluated at class
definition time. Without this, Python 3.11-3.13 raise TypeError
because pika.frame.Method does not support __class_getitem__.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add `from __future__ import annotations` to codecs.py so that
csv.DictReader[str] return type annotation is treated as a string
rather than evaluated at runtime. csv.DictReader became subscriptable
in Python 3.12 but this project supports 3.11+.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 2, 2026

PR Monitor - Final Summary

All CI checks are now passing across Python 3.11, 3.12, 3.13, and 3.14. All CodeRabbit review threads have been addressed and resolved.

Commits pushed

  1. Regenerated uv.lock - Removed internal pypi.aweber.io URLs causing DNS failures in CI
  2. Fixed README example - Corrected Consumer.process() signature and unused import
  3. Fixed connection.py typo - prototcol -> protocol in docstring
  4. Fixed exception safety - Coerced self.value to str in RejectedException
  5. Fixed pika.frame.Method subscript - Added from __future__ import annotations to connection.py for Python <3.14 compatibility
  6. Fixed csv.DictReader subscript - Added from __future__ import annotations to codecs.py for Python 3.11 compatibility

CodeRabbit feedback

  • 4/4 threads resolved (3 code fixes + 1 reply explaining intentional pickle.loads design choice)

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rejected/connection.py`:
- Around line 65-82: The shutdown flow currently only flips state when no
channel exists, allowing on_open/on_channel_open/on_ready to continue startup
and call into processing; update shutdown() (and mirror the change in the other
startup/teardown paths mentioned) so that if self.is_shutting_down is set and
self.channel is None you close the underlying connection instead of only setting
STATE_SHUTTING_DOWN, and additionally add early-return guards in on_open,
on_channel_open and on_ready to short-circuit (no-op) when self.is_shutting_down
is True; reference the methods shutdown(), on_open(), on_channel_open(),
on_ready(), the flag is_shutting_down, the attribute channel, and
set_state/STATE_SHUTTING_DOWN when applying the change.
- Around line 164-178: The on_channel_closed handler currently deletes the
self.channel attribute which causes later checks like "if self.channel:" to
raise AttributeError; instead, update on_channel_closed (the method
on_channel_closed) to set self.channel = None so the attribute remains present
and correctly typed (Channel | None) for subsequent teardown logic and guards to
work.
- Around line 343-378: The method building SSL options should also respect a
top-level self.config.ssl boolean and must pass server_hostname into
pika.SSLOptions; update the logic in the SSL builder (references:
self.config.ssl_options, self.config.ssl, and the returned pika.SSLOptions) so
that if ssl_options is empty but self.config.ssl is truthy you still create an
SSLContext; when protocol == ssl.PROTOCOL_TLS_CLIENT use
ssl.create_default_context() to get secure defaults, otherwise use
ssl.SSLContext(protocol); keep existing
load_verify_locations/load_cert_chain/set_ciphers work, and finally return
pika.SSLOptions(context=context, server_hostname=self.config.host or
self.config.hostname) so hostname verification occurs during the TLS handshake.
- Around line 260-268: The handler on_consumer_cancelled currently uses an elif
that prevents closing self.channel when the broker cancels a consumer and
self.is_shutting_down is False; change the elif to an independent if so that
after calling self.set_state(self.STATE_SHUTTING_DOWN) the code still checks and
closes the channel (call self.channel.close() when self.channel is truthy),
ensuring the connection doesn’t remain half-shutdown.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5e5ed4e8-4152-4a33-b600-df4c04dd7f06

📥 Commits

Reviewing files that changed from the base of the PR and between c387251 and 2e982e8.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • README.md
  • rejected/connection.py
  • rejected/exceptions.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • rejected/exceptions.py
  • README.md

Comment thread rejected/connection.py
Comment thread rejected/connection.py
Comment thread rejected/connection.py
Comment thread rejected/connection.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
rejected/connection.py (4)

67-83: ⚠️ Potential issue | 🟠 Major

Shutdown during CONNECTING state doesn't prevent startup completion.

When shutdown() is called before a channel exists (e.g., during CONNECTING), the method only sets the state but doesn't close the underlying connection. The on_open() and on_channel_open() callbacks lack guards for is_shutting_down, allowing startup to complete and on_ready() to fire after a stop was requested.

Add early-return guards in on_open and on_channel_open when is_shutting_down is true, or close the connection here when no channel exists.

Suggested fix
     def shutdown(self) -> None:
         if self.is_shutting_down:
             LOGGER.debug('Connection %s is already shutting down', self.name)
             return

         self.set_state(self.STATE_SHUTTING_DOWN)
         LOGGER.debug('Connection %s is shutting down', self.name)
         if self.is_active and self.channel:
             LOGGER.debug(
                 'Connection %s is sending a Basic.Cancel to RabbitMQ',
                 self.name,
             )
             self.channel.basic_cancel(
                 self.consumer_tag, self.on_consumer_cancelled
             )
         elif self.channel:
             self.channel.close()
+        elif self.connection:
+            self.connection.close()

And add guards in callbacks:

def on_open(self, connection: asyncio_connection.AsyncioConnection) -> None:
    if self.is_shutting_down:
        connection.close()
        return
    # ... rest of method

def on_channel_open(self, channel: pika.channel.Channel) -> None:
    if self.is_shutting_down:
        channel.close()
        return
    # ... rest of method
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` around lines 67 - 83, The shutdown flow can race with
startup: shutdown() sets is_shutting_down but if called before a channel exists
the connection is never closed and on_open/on_channel_open proceed to complete
startup and call on_ready; fix by adding early-return guards that close
resources when shutdown is in progress and/or closing the connection when no
channel exists in shutdown(): in shutdown() detect the no-channel case and close
the underlying connection, and in on_open(self, connection) and
on_channel_open(self, channel) immediately close the passed connection/channel
and return if self.is_shutting_down to prevent completing startup and firing
on_ready.

345-380: ⚠️ Potential issue | 🟠 Major

Pass server_hostname to pika.SSLOptions for proper TLS hostname verification.

pika.SSLOptions requires server_hostname for hostname verification during the TLS handshake. Without it, the connection is vulnerable to MITM attacks. Additionally, when using ssl.PROTOCOL_TLS_CLIENT, consider using ssl.create_default_context() which provides secure defaults including certificate validation and hostname checking.

Suggested fix
-        if not self.config.ssl_options:
+        if not self.config.ssl_options and not getattr(
+            self.config, 'ssl', False
+        ):
             return None

         protocol = self.config.ssl_options.get(
             'protocol', ssl.PROTOCOL_TLS_CLIENT
         )
         if isinstance(protocol, str):
             protocol = getattr(ssl, protocol)
-        context = ssl.SSLContext(protocol)
+        if protocol == ssl.PROTOCOL_TLS_CLIENT:
+            context = ssl.create_default_context()
+        else:
+            context = ssl.SSLContext(protocol)

         # ... CA and cert loading unchanged ...

-        return pika.SSLOptions(context=context)
+        return pika.SSLOptions(
+            context=context,
+            server_hostname=self.config.ssl_options.get(
+                'server_hostname', self.config.host
+            ),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` around lines 345 - 380, The SSL options builder is
missing server_hostname when constructing pika.SSLOptions and should use a
secure default SSLContext; update the code that builds the context (where
protocol/SSLContext is created and returned via pika.SSLOptions) to create the
context with ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) or, if
using SSLContext(protocol), enable hostname checking and set verify_mode, then
pass the peer hostname from self.config.ssl_options (e.g. server_hostname or
host) into pika.SSLOptions(..., server_hostname=...) so TLS certificate hostname
verification is performed.

262-270: ⚠️ Potential issue | 🟠 Major

Change elif to if to ensure channel closes after broker-initiated cancels.

When the broker initiates a cancel, is_shutting_down is False, so line 268 sets the state but the elif on line 269 is skipped, leaving the channel open. These should be independent checks.

Suggested fix
     def on_consumer_cancelled(
         self, _method: pika.frame.Method[pika.spec.Basic.CancelOk]
     ) -> None:
         """Invoked by pika when a ``Basic.CancelOk`` is received."""
         LOGGER.info('Connection %s consumer has been cancelled', self.name)
         if not self.is_shutting_down:
             self.set_state(self.STATE_SHUTTING_DOWN)
-        elif self.channel:
+        if self.channel:
             self.channel.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` around lines 262 - 270, The handler
on_consumer_cancelled currently uses an elif so when is_shutting_down is False
we set_state(self.STATE_SHUTTING_DOWN) but skip closing the channel; change the
elif self.channel: to an independent if self.channel: so the channel.close()
runs regardless of shutdown state. Update the block referencing
on_consumer_cancelled, is_shutting_down, set_state, STATE_SHUTTING_DOWN and
channel.close accordingly.

179-179: ⚠️ Potential issue | 🔴 Critical

Don't delete self.channel; set it to None instead.

self.channel is typed as Channel | None and checked with if self.channel: elsewhere (lines 74, 82, 269). Deleting the attribute will cause AttributeError on subsequent access. Set it to None to maintain type consistency.

Suggested fix
-        del self.channel
+        self.channel = None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` at line 179, Replace the deletion of the channel
attribute with a None assignment: wherever the code currently does "del
self.channel" (e.g., in the connection teardown/cleanup method), change it to
"self.channel = None" so the attribute remains present and matches its declared
type (Channel | None) for subsequent "if self.channel:" checks in the class.
🧹 Nitpick comments (1)
rejected/connection.py (1)

223-223: Consider setting self.connection = None instead of deleting.

Similar to the self.channel issue, deleting self.connection can cause AttributeError if any code path accesses it after failure. While the connection failure should trigger process shutdown, setting to None is safer and more consistent with the typed attribute pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` at line 223, Replace the deletion of the connection
attribute with setting it to None: change the statement that does "del
self.connection" to "self.connection = None" so subsequent accesses won't raise
AttributeError; also ensure the owning class's attribute (self.connection) is
typed/treated as Optional (and mirror the same pattern used for self.channel) so
callers can safely check for None.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rejected/codecs.py`:
- Around line 103-109: When content_type equals AVRO_DATUM_MIME_TYPE but
required prerequisites (fastavro module or message_type) are missing, fail fast
instead of falling through; in the decode/encode paths where the current
conditional checks fastavro is not None and message_type (around the logic that
calls self._avro_schema and fastavro.schemaless_reader), raise a clear exception
(e.g., ValueError or a custom CodecError) indicating that Avro support is
unavailable and which prerequisite is missing (fastavro or message_type). Do the
same for the corresponding encode branch (the similar block referenced at lines
142-147) so both encode and decode consistently error when Avro prerequisites
are not satisfied.
- Around line 100-101: Replace uses of zlib for gzip handling: change the
decompression in the content_encoding == 'gzip' branch (where body =
zlib.decompress(body)) to use gzip.decompress(body) and likewise replace the
compression call (zlib.compress(body)) with gzip.compress(body); add import gzip
to the module imports so RFC 1952-compliant gzip streams are used; locate the
changes in the codecs.py functions that handle content_encoding and
response/body compression.
- Around line 180-184: The async HTTP client created lazily in
_load_http_schema() is never closed because the shutdown flow doesn't await
Codec.close(); update the shutdown/cleanup coroutine in the process module to
await codec.close() for every instantiated codec (or provide an async close_all
that iterates and awaits codec.close()), ensuring the Codec.close() async method
is called for codecs that created an httpx.AsyncClient so the HTTP client is
properly aclosed and set to None.
- Around line 98-124: The decode logic in codecs.py can raise raw exceptions
from bz2.decompress, zlib.decompress, pickle.loads, plistlib.loads,
yaml.safe_load, etc., which bypass the caller's expected DecodeError handling;
wrap each decoding branch (content_encoding handling and the branches that call
bz2.decompress, zlib.decompress, pickle.loads for PICKLE_MIME_TYPES,
plistlib.loads for 'application/x-plist', yaml.safe_load for YAML_MIME_TYPES,
and any other loaders like _load_json/_load_msgpack/_load_csv/_load_bs4) in a
try/except that catches the implementation-specific exceptions and re-raises a
unified DecodeError (preserving original exception info/message) so callers
always receive DecodeError; keep the existing fastavro path using
self._avro_schema and fastavro.schemaless_reader but also convert its errors to
DecodeError if needed.

---

Duplicate comments:
In `@rejected/connection.py`:
- Around line 67-83: The shutdown flow can race with startup: shutdown() sets
is_shutting_down but if called before a channel exists the connection is never
closed and on_open/on_channel_open proceed to complete startup and call
on_ready; fix by adding early-return guards that close resources when shutdown
is in progress and/or closing the connection when no channel exists in
shutdown(): in shutdown() detect the no-channel case and close the underlying
connection, and in on_open(self, connection) and on_channel_open(self, channel)
immediately close the passed connection/channel and return if
self.is_shutting_down to prevent completing startup and firing on_ready.
- Around line 345-380: The SSL options builder is missing server_hostname when
constructing pika.SSLOptions and should use a secure default SSLContext; update
the code that builds the context (where protocol/SSLContext is created and
returned via pika.SSLOptions) to create the context with
ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) or, if using
SSLContext(protocol), enable hostname checking and set verify_mode, then pass
the peer hostname from self.config.ssl_options (e.g. server_hostname or host)
into pika.SSLOptions(..., server_hostname=...) so TLS certificate hostname
verification is performed.
- Around line 262-270: The handler on_consumer_cancelled currently uses an elif
so when is_shutting_down is False we set_state(self.STATE_SHUTTING_DOWN) but
skip closing the channel; change the elif self.channel: to an independent if
self.channel: so the channel.close() runs regardless of shutdown state. Update
the block referencing on_consumer_cancelled, is_shutting_down, set_state,
STATE_SHUTTING_DOWN and channel.close accordingly.
- Line 179: Replace the deletion of the channel attribute with a None
assignment: wherever the code currently does "del self.channel" (e.g., in the
connection teardown/cleanup method), change it to "self.channel = None" so the
attribute remains present and matches its declared type (Channel | None) for
subsequent "if self.channel:" checks in the class.

---

Nitpick comments:
In `@rejected/connection.py`:
- Line 223: Replace the deletion of the connection attribute with setting it to
None: change the statement that does "del self.connection" to "self.connection =
None" so subsequent accesses won't raise AttributeError; also ensure the owning
class's attribute (self.connection) is typed/treated as Optional (and mirror the
same pattern used for self.channel) so callers can safely check for None.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 100e85b3-451a-4e21-a55f-482fad203f19

📥 Commits

Reviewing files that changed from the base of the PR and between 2e982e8 and e75b26d.

📒 Files selected for processing (2)
  • rejected/codecs.py
  • rejected/connection.py

Comment thread rejected/codecs.py Outdated
Comment thread rejected/codecs.py Outdated
Comment thread rejected/codecs.py Outdated
Comment thread rejected/codecs.py
connection.py:
- Add early-return guards in on_open/on_channel_open when
  is_shutting_down, preventing startup from continuing after
  shutdown is requested during CONNECTING state
- Close underlying connection when shutdown() is called before
  a channel exists (no channel to cancel/close)
- Replace `del self.channel` with `self.channel = None` in
  on_channel_closed to avoid AttributeError in teardown guards
- Change elif to if in on_consumer_cancelled so channel always
  closes after broker-initiated consumer cancels
- Use ssl.create_default_context() for PROTOCOL_TLS_CLIENT to
  get secure defaults (cert validation, hostname checking)
- Check self.config.ssl bool to enable TLS without explicit
  ssl_options dict
- Pass server_hostname to pika.SSLOptions for proper TLS
  hostname verification

codecs.py:
- Replace zlib with gzip module for gzip content_encoding to
  comply with RFC 1952 (proper gzip framing, not raw zlib)
- Wrap entire decode body in try/except to normalize all
  failures to DecodeError consistently
- Fail fast with clear error when Avro content_type is used
  but fastavro or message_type is missing (both encode/decode)

process.py:
- Wire codec.close() into stop_consumer so httpx.AsyncClient
  is properly closed during shutdown
- execute() now returns None, result stored on ctx.result

consumer.py:
- execute() return type changed from Result to None; result
  is now set on ctx.result

testing.py, test_consumer.py:
- Update callers to read ctx.result instead of execute() return

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 3, 2026

🤖 This comment was posted by Claude on behalf of @gmr

PR Monitor Summary

Response to CodeRabbit's Review (8 threads addressed)

connection.py (4 threads):

  • Added early-return guards in on_open() and on_channel_open() when is_shutting_down is set, plus closing the underlying connection when shutdown() is called before a channel exists — prevents startup from racing past a shutdown request (resolved)
  • Replaced del self.channel with self.channel = None in on_channel_closed() to avoid AttributeError in subsequent teardown guards (resolved)
  • Changed elif to if in on_consumer_cancelled() so the channel always closes after broker-initiated consumer cancels (resolved)
  • SSL options: check self.config.ssl bool to enable TLS even without explicit ssl_options dict; use ssl.create_default_context() for PROTOCOL_TLS_CLIENT secure defaults; pass server_hostname to pika.SSLOptions for proper TLS hostname verification (resolved)

codecs.py (4 threads):

  • Replaced zlib.compress/decompress with gzip.compress/decompress for gzip content_encoding to comply with RFC 1952 (resolved)
  • Wrapped entire decode body in try/except to normalize all failures to DecodeError consistently (resolved)
  • Added fail-fast with clear error messages when Avro content_type is used but fastavro or message_type is missing, for both encode and decode paths (resolved)
  • Wired codec.close() into stop_consumer() in process.py so the httpx.AsyncClient is properly closed during shutdown (resolved)

Additional Changes (included in this push)

  • execute() return type changed from Result to None; result is now stored on ctx.result
  • Updated process.py, testing.py, and test_consumer.py callers accordingly

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
rejected/codecs.py (1)

263-272: Consider adding a timeout or retry for HTTP schema fetches.

The HTTP schema loading uses a 30-second timeout but has no retry logic. For transient network failures, the first message requiring a schema will fail with DecodeError. This may be acceptable for the use case, but consider whether a retry with backoff would improve resilience.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/codecs.py` around lines 263 - 272, The HTTP schema fetch in the
method using self._http_client (creating httpx.AsyncClient(timeout=30) and
awaiting response = await self._http_client.get(uri)) needs retry/backoff to
handle transient failures: wrap the GET in a small retry loop (e.g. 2–3
attempts) with exponential backoff delays, catch transient exceptions
(httpx.RequestError, timeouts) and non-200 responses, and only raise DecodeError
(for the message_type) after all retries fail; ensure the client creation
(self._http_client) remains reused and the retry logic targets the GET call and
response.status_code handling.
rejected/testing.py (1)

228-250: Consider handling MESSAGE_REQUEUE result in test assertions.

The process_message method raises exceptions for CONSUMER_EXCEPTION, MESSAGE_EXCEPTION, PROCESSING_EXCEPTION, and UNHANDLED_EXCEPTION, but MESSAGE_REQUEUE and MESSAGE_DROP silently return the measurement. If a test expects an error but gets a requeue, the test would pass without indication. This may be intentional for the testing use case, but consider documenting this behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/testing.py` around lines 228 - 250, The test currently ignores
models.Result.MESSAGE_REQUEUE (and MESSAGE_DROP) after await
self.consumer.execute(ctx) which can hide failures; update the post-execute
result handling (the block checking ctx.result) to explicitly handle
models.Result.MESSAGE_REQUEUE (and optionally models.Result.MESSAGE_DROP) by
raising a clear test exception (e.g., ConsumerRequeueException or AssertionError
with context) or by asserting the expected requeue/drop behavior so a requeue
doesn't silently make the test pass — locate the branch around ctx.result checks
in the same function where _capture_log and self.consumer._log_exception are set
and add the new case(s) there.
rejected/connection.py (1)

20-26: State constant STATE_CLOSED = 0x08 intentionally aliases STATE_STOPPED.

The STATE_CLOSED constant shares the same value (0x08) as the parent's STATE_STOPPED. This appears intentional since the STATES dict overrides the 0x08 label with 'Closed' for connection-specific semantics, and is_closed correctly delegates to is_stopped. However, this coupling could cause confusion if the parent class changes.

Consider using a distinct value (e.g., 0x09) or documenting this aliasing explicitly:

# Intentionally aliased to STATE_STOPPED for is_stopped/is_closed equivalence
STATE_CLOSED: typing.ClassVar[int] = state.State.STATE_STOPPED
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` around lines 20 - 26, The Connection class defines
STATE_CLOSED as 0x08 which aliases the parent state's STATE_STOPPED; update the
declaration of STATE_CLOSED in Connection to avoid accidental divergence or
clarify intent: either assign it explicitly from the parent (STATE_CLOSED =
state.State.STATE_STOPPED) with an inline comment like "Intentionally aliased to
STATE_STOPPED for is_stopped/is_closed equivalence", or choose a distinct value
(e.g., 0x09) and update STATES accordingly; ensure references to
Connection.STATES, is_closed and is_stopped remain consistent after the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@rejected/codecs.py`:
- Around line 263-272: The HTTP schema fetch in the method using
self._http_client (creating httpx.AsyncClient(timeout=30) and awaiting response
= await self._http_client.get(uri)) needs retry/backoff to handle transient
failures: wrap the GET in a small retry loop (e.g. 2–3 attempts) with
exponential backoff delays, catch transient exceptions (httpx.RequestError,
timeouts) and non-200 responses, and only raise DecodeError (for the
message_type) after all retries fail; ensure the client creation
(self._http_client) remains reused and the retry logic targets the GET call and
response.status_code handling.

In `@rejected/connection.py`:
- Around line 20-26: The Connection class defines STATE_CLOSED as 0x08 which
aliases the parent state's STATE_STOPPED; update the declaration of STATE_CLOSED
in Connection to avoid accidental divergence or clarify intent: either assign it
explicitly from the parent (STATE_CLOSED = state.State.STATE_STOPPED) with an
inline comment like "Intentionally aliased to STATE_STOPPED for
is_stopped/is_closed equivalence", or choose a distinct value (e.g., 0x09) and
update STATES accordingly; ensure references to Connection.STATES, is_closed and
is_stopped remain consistent after the change.

In `@rejected/testing.py`:
- Around line 228-250: The test currently ignores models.Result.MESSAGE_REQUEUE
(and MESSAGE_DROP) after await self.consumer.execute(ctx) which can hide
failures; update the post-execute result handling (the block checking
ctx.result) to explicitly handle models.Result.MESSAGE_REQUEUE (and optionally
models.Result.MESSAGE_DROP) by raising a clear test exception (e.g.,
ConsumerRequeueException or AssertionError with context) or by asserting the
expected requeue/drop behavior so a requeue doesn't silently make the test pass
— locate the branch around ctx.result checks in the same function where
_capture_log and self.consumer._log_exception are set and add the new case(s)
there.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 414831a8-bf2e-45ce-9c25-aa5e03c54251

📥 Commits

Reviewing files that changed from the base of the PR and between e75b26d and b8d6db9.

📒 Files selected for processing (6)
  • rejected/codecs.py
  • rejected/connection.py
  • rejected/consumer.py
  • rejected/process.py
  • rejected/testing.py
  • tests/test_consumer.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_consumer.py

codecs.py: HTTP schema fetch retries 3 times with exponential backoff,
catching httpx.RequestError and non-200 responses before raising
DecodeError.

connection.py: STATE_CLOSED explicitly assigned from
state.State.STATE_STOPPED with comment clarifying the intentional
alias for is_closed/is_stopped equivalence.

testing.py: process_message now raises AssertionError on
MESSAGE_REQUEUE (unexpected in tests). MESSAGE_DROP returns normally
(valid test outcome). Uses match/case for clarity.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
rejected/connection.py (1)

233-240: ⚠️ Potential issue | 🟠 Major

Don't delete self.connection from object state.

This recreates the same teardown hazard as deleting self.channel: later paths still assume the attribute exists. After on_failure(), a later shutdown() will reach Line 87 and raise AttributeError instead of becoming a no-op or clean close.

Minimal fix
     def on_failure(self) -> None:
         LOGGER.info('Connection failure, terminating connection')
         self.set_state(self.STATE_CLOSED)
         try:
             self.connection.close()
-        except (AttributeError, pika.exceptions.ConnectionWrongStateError):
+        except pika.exceptions.ConnectionWrongStateError:
             pass
-        del self.connection
         self.callbacks.on_connection_failure(self.name)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/connection.py` around lines 233 - 240, on_failure() currently
deletes self.connection which causes later methods like shutdown() to raise
AttributeError; remove the del self.connection and instead leave the attribute
in place (or set self.connection = None) after attempting close so subsequent
code that references self.connection or checks its state won't fail; update the
on_failure method (and any related teardown code) to rely on self.connection
being present or None rather than deleted, referencing the on_failure() method
and shutdown() paths to verify no AttributeError is possible.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rejected/codecs.py`:
- Around line 127-128: The call site currently invokes _load_bs4(body) even when
content_type (from BS4_MIME_TYPES) is XML; update the call to pass content_type
(e.g., _load_bs4(body, content_type)) and change _load_bs4 to accept the
content_type parameter and choose the appropriate parser (use an XML parser for
text/xml vs an HTML parser for text/html, or raise/fallback if the XML parser is
unavailable). Modify references to _load_bs4 and its signature accordingly so
the correct parser is selected based on the passed content_type (and keep
BS4_MIME_TYPES membership logic intact).
- Around line 151-179: The encode() path currently can raise or silently skip
encoding (e.g., _encode_avro → _avro_schema/_load_avro_schema raising
DecodeError, missing umsgpack leaving non-bytes for 'application/msgpack', or
raw exceptions from json.dumps/pickle/plistlib/yaml/gzip/bz2); wrap the entire
serialization and compression block inside a try/except that catches any
Exception and re-raises as EncodeError (preserving the original exception
message), ensure that when content_type == 'application/msgpack' and umsgpack is
None you raise EncodeError (not leave body unencoded), and guarantee the final
body is bytes before applying content_encoding or returning (convert str→bytes
and raise EncodeError if conversion fails); reference encode(), _encode_avro,
_avro_schema/_load_avro_schema, EncodeError, umsgpack, PICKLE_MIME_TYPES,
YAML_MIME_TYPES, BS4_MIME_TYPES in your changes.

In `@rejected/connection.py`:
- Around line 105-111: The try/except around
self.connection.channel(on_open_callback=self.on_channel_open) only catches
pika.exceptions.ConnectionClosed but must also catch
pika.exceptions.ConnectionWrongStateError; update the except clause to handle
ConnectionWrongStateError the same way as ConnectionClosed (log via
LOGGER.warning, call self.set_state(self.STATE_CLOSED), invoke
self.callbacks.on_closed(self.name) and return) so opening a channel when the
connection is in the wrong state is handled consistently; reference the existing
handlers for ConnectionClosed and the later handling at lines 223-224 as the
desired behavior for ConnectionWrongStateError.
- Around line 205-231: The handler currently treats only reply_code <= 0 and 404
as fatal; update the logic in the channel-close handling (the block referencing
reply_code, reply_text, LOGGER, on_failure, is_running, is_shutting_down,
connection.channel, on_channel_open, set_state, STATE_CONNECTING) to also treat
protocol-level channel closure codes 403, 405, and 406 as terminal failures by
calling self.on_failure() (instead of falling through to the reopen branch);
keep existing behavior for shutdown (is_shutting_down) and non-fatal closures,
but ensure any ChannelClosed-like protocol error triggers on_failure
immediately.

---

Duplicate comments:
In `@rejected/connection.py`:
- Around line 233-240: on_failure() currently deletes self.connection which
causes later methods like shutdown() to raise AttributeError; remove the del
self.connection and instead leave the attribute in place (or set self.connection
= None) after attempting close so subsequent code that references
self.connection or checks its state won't fail; update the on_failure method
(and any related teardown code) to rely on self.connection being present or None
rather than deleted, referencing the on_failure() method and shutdown() paths to
verify no AttributeError is possible.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2bea62cc-e636-4374-bf12-be46b1e54339

📥 Commits

Reviewing files that changed from the base of the PR and between b8d6db9 and d22b034.

📒 Files selected for processing (3)
  • rejected/codecs.py
  • rejected/connection.py
  • rejected/testing.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • rejected/testing.py

Comment thread rejected/codecs.py Outdated
Comment thread rejected/codecs.py Outdated
Comment thread rejected/connection.py
Comment thread rejected/connection.py Outdated
- Remove pickle deserialization support (RCE vector)
- Validate message_type before formatting into schema registry URI
- Remove env var dump from Sentry exception reports
- Enforce TLS cert verification for non-default SSL protocols
- Fix stop_consumer not awaiting async shutdown() coroutine
- Fix reject() RuntimeError when no_ack=True during shutdown
- Fix STATE_PROCESSING collision with STATE_ACTIVE (0x04 -> 0x09)
- Fix GarbageCollectorMixin not reading settings from positional args
- Add UV_CONFIG_FILE=/dev/null to CI workflow
- Remove S301 ruff suppression (no longer needed without pickle)
- Delete stale files: .travis.yml, .codeclimate.yml, README.rst,
  requires/, init.d/
- Fix redundant import in tests/mocks.py

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Pass content_type to _load_bs4 so XML content uses the xml parser
  instead of silently applying html.parser to text/xml
- Wrap encode() in try/except to maintain EncodeError contract,
  matching the existing pattern in decode()
- Extract _serialize and _compress helpers to reduce encode complexity
- Guard against missing umsgpack with explicit EncodeError
- Catch ConnectionWrongStateError alongside ConnectionClosed in on_open
- Treat AMQP reply codes 403, 405, 406 as terminal channel errors to
  prevent infinite reopen loops on configuration/permission failures

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr
Copy link
Copy Markdown
Owner Author

gmr commented Apr 3, 2026

🤖 This comment was posted by Claude on behalf of @gmr

PR Monitor Summary

CodeRabbit Review Feedback Addressed (commit 87439f7)

codecs.py:

  • ✅ Pass content_type to _load_bs4() so text/xml uses the XML parser instead of silently using html.parser (resolved)
  • ✅ Wrap encode() in try/except to maintain the EncodeError contract, matching the existing pattern in decode() (resolved)
  • ✅ Extract _serialize() and _compress() helpers to reduce encode() complexity below C901 threshold
  • ✅ Guard against missing umsgpack with explicit EncodeError instead of silently skipping encoding

connection.py:

  • ✅ Catch ConnectionWrongStateError alongside ConnectionClosed in on_open (resolved)
  • ✅ Treat AMQP reply codes 403, 405, 406 as terminal channel errors to prevent infinite reopen loops on configuration/permission failures (resolved)

All 208 tests pass locally across the changes. Waiting for CI and CodeRabbit re-review on the new commit.

@gmr gmr merged commit f581ab9 into main Apr 3, 2026
5 checks passed
@gmr gmr deleted the functional-consumer branch April 3, 2026 14:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant