|
1 | | -# Rejected |
| 1 | +# CLAUDE.md |
2 | 2 |
|
3 | | -Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon. |
| 3 | +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. |
| 4 | + |
| 5 | +## Project |
| 6 | + |
| 7 | +Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon. It manages consumer processes that connect to RabbitMQ, receive messages, and process them with user-defined consumer classes. |
4 | 8 |
|
5 | 9 | ## Development |
6 | 10 |
|
7 | 11 | ```bash |
8 | | -UV_CONFIG_FILE=/dev/null uv sync --all-groups # Install all dependencies |
9 | | -UV_CONFIG_FILE=/dev/null uv run coverage run # Run tests with coverage |
10 | | -UV_CONFIG_FILE=/dev/null uv run coverage report # View coverage report |
11 | | -UV_CONFIG_FILE=/dev/null uv run pre-commit run -a # Run linting |
| 12 | +UV_CONFIG_FILE=/dev/null uv sync --all-groups # Install all dependencies |
| 13 | +UV_CONFIG_FILE=/dev/null uv run coverage run # Run all tests with coverage |
| 14 | +UV_CONFIG_FILE=/dev/null uv run coverage report # View coverage report |
| 15 | +UV_CONFIG_FILE=/dev/null uv run pre-commit run -a # Run linting (ruff format + ruff check) |
12 | 16 | ``` |
13 | 17 |
|
14 | | -## Documentation |
| 18 | +To run a single test: |
| 19 | +```bash |
| 20 | +UV_CONFIG_FILE=/dev/null .venv/bin/python -m unittest tests.test_consumer.ConsumerExecuteTests.test_execute_calls_process |
| 21 | +``` |
15 | 22 |
|
| 23 | +Documentation: |
16 | 24 | ```bash |
17 | | -UV_CONFIG_FILE=/dev/null uv run mkdocs serve # Serve docs locally at http://127.0.0.1:8000 |
18 | | -UV_CONFIG_FILE=/dev/null uv run mkdocs build # Build docs to site/ |
| 25 | +UV_CONFIG_FILE=/dev/null uv run mkdocs serve # Serve docs locally at http://127.0.0.1:8000 |
| 26 | +UV_CONFIG_FILE=/dev/null uv run mkdocs build # Build docs to site/ |
19 | 27 | ``` |
20 | 28 |
|
| 29 | +## Important Notes |
| 30 | + |
| 31 | +- Use `UV_CONFIG_FILE=/dev/null` for all uv commands — this repo is on github.com, not the internal AWeber PyPI |
| 32 | +- Tests use unittest discovery with `-s tests -t .` to support relative imports in the test package |
| 33 | +- `rejected/consumer.py` has intentional suppression of B904/BLE001/C901 for error handling complexity |
| 34 | +- `rejected/process.py` has intentional C901 suppression |
| 35 | + |
21 | 36 | ## Code Style |
22 | 37 |
|
23 | 38 | - Ruff for linting and formatting (configured in pyproject.toml) |
24 | 39 | - Single quotes for strings |
25 | 40 | - 79 character line length |
26 | | -- `rejected/consumer.py` has intentional suppression of B904/C901 for Tornado gen.Return pattern |
27 | 41 |
|
28 | | -## Notes |
| 42 | +## Architecture |
29 | 43 |
|
30 | | -- Use `UV_CONFIG_FILE=/dev/null` for all uv commands — this repo is on github.com, not the internal AWeber PyPI |
31 | | -- Tests use unittest discovery with `-s tests -t .` to support relative imports in the test package |
32 | | -- `pkg_resources` has been replaced with `importlib.metadata` throughout |
| 44 | +### Process Model |
| 45 | + |
| 46 | +`Controller` (CLI entry point) → `MasterControlProgram` (MCP) → N × `Process` (multiprocessing.Process) |
| 47 | + |
| 48 | +Each `Process` runs an asyncio event loop with one or more pika `AsyncioConnection`s. The MCP polls child processes via `SIGPROF`/`SIGALRM` signals, collects stats via a multiprocessing Queue, and manages process lifecycle (spawn, kill unresponsive, restart on errors). |
| 49 | + |
| 50 | +### Message Flow |
| 51 | + |
| 52 | +1. `Connection` receives a message from RabbitMQ via pika callbacks |
| 53 | +2. `Process.on_message` builds a `ProcessingContext` (Pydantic model) and schedules `invoke_consumer` |
| 54 | +3. `invoke_consumer` decodes the body via `Codec`, then calls `consumer.execute(ctx)` |
| 55 | +4. `_Consumer.execute` runs pre-validation (message type, retry limits), then delegates to `_run_consumer` |
| 56 | +5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `TransactionConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument |
| 57 | +6. `Process.on_processed` handles the result: ack, nack, requeue, or republish based on the `Result` enum |
| 58 | + |
| 59 | +### Key Module Responsibilities |
| 60 | + |
| 61 | +- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `TransactionConsumer` (concurrent, ctx-style) |
| 62 | +- **codecs.py**: `Codec` class handles encode/decode dispatch by content_type/content_encoding, plus async Avro schema loading |
| 63 | +- **connection.py**: Wraps pika `AsyncioConnection`, manages channel lifecycle, QoS, consumer tags |
| 64 | +- **process.py**: `Process(multiprocessing.Process)` — the per-consumer child process with asyncio event loop |
| 65 | +- **mcp.py**: `MasterControlProgram` — parent process that spawns/monitors/polls child processes |
| 66 | +- **models.py**: All Pydantic models — `Config`, `ConsumerConfig`, `ConnectionConfig`, `Message`, `ProcessingContext`, `Result` |
| 67 | +- **state.py**: `State` base class with state machine (INITIALIZING → CONNECTING → IDLE → ACTIVE → SHUTTING_DOWN → STOPPED) |
| 68 | +- **testing.py**: `AsyncTestCase(IsolatedAsyncioTestCase)` for consumer unit tests with mocked RabbitMQ |
0 commit comments