Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,68 @@
# Rejected
# CLAUDE.md

Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon.
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project

Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon. It manages consumer processes that connect to RabbitMQ, receive messages, and process them with user-defined consumer classes.

## Development

```bash
UV_CONFIG_FILE=/dev/null uv sync --all-groups # Install all dependencies
UV_CONFIG_FILE=/dev/null uv run coverage run # Run tests with coverage
UV_CONFIG_FILE=/dev/null uv run coverage report # View coverage report
UV_CONFIG_FILE=/dev/null uv run pre-commit run -a # Run linting
UV_CONFIG_FILE=/dev/null uv sync --all-groups # Install all dependencies
UV_CONFIG_FILE=/dev/null uv run coverage run # Run all tests with coverage
UV_CONFIG_FILE=/dev/null uv run coverage report # View coverage report
UV_CONFIG_FILE=/dev/null uv run pre-commit run -a # Run linting (ruff format + ruff check)
```

## Documentation
To run a single test:
```bash
UV_CONFIG_FILE=/dev/null .venv/bin/python -m unittest tests.test_consumer.ConsumerExecuteTests.test_execute_calls_process
```

Documentation:
```bash
UV_CONFIG_FILE=/dev/null uv run mkdocs serve # Serve docs locally at http://127.0.0.1:8000
UV_CONFIG_FILE=/dev/null uv run mkdocs build # Build docs to site/
UV_CONFIG_FILE=/dev/null uv run mkdocs serve # Serve docs locally at http://127.0.0.1:8000
UV_CONFIG_FILE=/dev/null uv run mkdocs build # Build docs to site/
```

## Important Notes

- Use `UV_CONFIG_FILE=/dev/null` for all uv commands — this repo is on github.com, not the internal AWeber PyPI
- Tests use unittest discovery with `-s tests -t .` to support relative imports in the test package
- `rejected/consumer.py` has intentional suppression of B904/BLE001/C901 for error handling complexity
- `rejected/process.py` has intentional C901 suppression

## Code Style

- Ruff for linting and formatting (configured in pyproject.toml)
- Single quotes for strings
- 79 character line length
- `rejected/consumer.py` has intentional suppression of B904/C901 for Tornado gen.Return pattern

## Notes
## Architecture

- Use `UV_CONFIG_FILE=/dev/null` for all uv commands — this repo is on github.com, not the internal AWeber PyPI
- Tests use unittest discovery with `-s tests -t .` to support relative imports in the test package
- `pkg_resources` has been replaced with `importlib.metadata` throughout
### Process Model

`Controller` (CLI entry point) → `MasterControlProgram` (MCP) → N × `Process` (multiprocessing.Process)

Each `Process` runs an asyncio event loop with one or more pika `AsyncioConnection`s. The MCP polls child processes via `SIGPROF`/`SIGALRM` signals, collects stats via a multiprocessing Queue, and manages process lifecycle (spawn, kill unresponsive, restart on errors).

### Message Flow

1. `Connection` receives a message from RabbitMQ via pika callbacks
2. `Process.on_message` builds a `ProcessingContext` (Pydantic model) and schedules `invoke_consumer`
3. `invoke_consumer` decodes the body via `Codec`, then calls `consumer.execute(ctx)`
4. `_Consumer.execute` runs pre-validation (message type, retry limits), then delegates to `_run_consumer`
5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `TransactionConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument
6. `Process.on_processed` handles the result: ack, nack, requeue, or republish based on the `Result` enum

### Key Module Responsibilities

- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `TransactionConsumer` (concurrent, ctx-style)
- **codecs.py**: `Codec` class handles encode/decode dispatch by content_type/content_encoding, plus async Avro schema loading
- **connection.py**: Wraps pika `AsyncioConnection`, manages channel lifecycle, QoS, consumer tags
- **process.py**: `Process(multiprocessing.Process)` — the per-consumer child process with asyncio event loop
- **mcp.py**: `MasterControlProgram` — parent process that spawns/monitors/polls child processes
- **models.py**: All Pydantic models — `Config`, `ConsumerConfig`, `ConnectionConfig`, `Message`, `ProcessingContext`, `Result`
- **state.py**: `State` base class with state machine (INITIALIZING → CONNECTING → IDLE → ACTIVE → SHUTTING_DOWN → STOPPED)
- **testing.py**: `AsyncTestCase(IsolatedAsyncioTestCase)` for consumer unit tests with mocked RabbitMQ
33 changes: 27 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ each run in an isolated process. It has the ability to collect statistical
data from the consumer processes and report on it.

[![Version](https://img.shields.io/pypi/v/rejected.svg?)](https://pypi.python.org/pypi/rejected)
[![Python](https://img.shields.io/pypi/pyversions/rejected.svg)](https://pypi.python.org/pypi/rejected)
[![License](https://img.shields.io/pypi/l/rejected.svg?)](https://github.com/gmr/rejected/blob/main/LICENSE)

## Features

- Async consumers built on `asyncio`
- Automatic exception handling including connection management and consumer restarting
- Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers
- Metrics logging and submission to statsd and InfluxDB
- Smart consumer classes that automatically decode and deserialize message bodies based on message headers
- Concurrent message processing with `TransactionConsumer`
- Metrics via statsd and/or Prometheus
- Built-in profiling of consumer code
- Ability to write asynchronous code in consumers allowing for parallel communication with external resources
- Avro schema support with file and HTTP schema registries
- YAML and TOML configuration file support

## Installation

Expand All @@ -29,9 +33,11 @@ pip install rejected
For optional features:

```bash
pip install rejected[avro] # Avro support
pip install rejected[html] # HTML message body support
pip install rejected[msgpack] # MessagePack support
pip install rejected[avro] # Avro datum serialization
pip install rejected[html] # HTML message body support
pip install rejected[msgpack] # MessagePack support
pip install rejected[prometheus] # Prometheus metrics exporter
pip install rejected[sentry] # Sentry error reporting
```

## Documentation
Expand All @@ -52,3 +58,18 @@ class Test(consumer.Consumer):
async def process(self) -> None:
LOGGER.debug('In Test.process: %s', self.body)
```

For concurrent message processing, use `TransactionConsumer`:

```python
from rejected import consumer, models
import logging

LOGGER = logging.getLogger(__name__)


class Test(consumer.TransactionConsumer):

async def process(self, ctx: models.ProcessingContext) -> None:
LOGGER.debug('Processing: %s', ctx.message.body)
```
94 changes: 88 additions & 6 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,104 @@

The primary base classes for building message consumers.

### Consumer

::: rejected.consumer.Consumer
options:
members:
- process
- prepare
- finish
- on_finish
- initialize
- shutdown
- on_blocked
- on_unblocked
- on_confirmation
- publish_message
- set_sentry_context
- unset_sentry_context
- send_exception_to_sentry
- require_setting
- stats_add_duration
- stats_incr
- stats_set_tag
- stats_set_value
- stats_track_duration
- body
- app_id
- content_encoding
- content_type
- correlation_id
- exchange
- expiration
- headers
- message_id
- message_type
- name
- priority
- properties
- redelivered
- reply_to
- returned
- routing_key
- settings
- timestamp
- user_id

### TransactionConsumer

::: rejected.consumer.TransactionConsumer
options:
members:
- process
- prepare
- finish
- initialize
- shutdown

## Exceptions

::: rejected.consumer.ConsumerException
::: rejected.exceptions.ConsumerException

::: rejected.exceptions.MessageException

::: rejected.exceptions.ProcessingException

::: rejected.exceptions.RejectedException

## Models

::: rejected.consumer.MessageException
### Message

::: rejected.consumer.ProcessingException
::: rejected.models.Message

::: rejected.consumer.RejectedException
### ProcessingContext

::: rejected.models.ProcessingContext

### Result

::: rejected.models.Result

## Testing

::: rejected.testing.AsyncTestCase
options:
members:
- get_consumer
- get_settings
- create_context
- process_message
- published_messages
- measurement

::: rejected.testing.PublishedMessage

## Measurement

::: rejected.measurement.Measurement

## Data
## Mixins

::: rejected.data.Measurement
::: rejected.mixins.GarbageCollectorMixin
7 changes: 0 additions & 7 deletions docs/api_consumer.rst

This file was deleted.

5 changes: 0 additions & 5 deletions docs/api_data.rst

This file was deleted.

5 changes: 0 additions & 5 deletions docs/api_testing.rst

This file was deleted.

35 changes: 0 additions & 35 deletions docs/cli.rst

This file was deleted.

Loading
Loading