Skip to content

Commit 2c08445

Browse files
authored
Merge pull request #63 from gmr/feature/max-messages
Add -n/--max-messages flag for debug mode
2 parents c69633e + a367aad commit 2c08445

18 files changed

Lines changed: 170 additions & 98 deletions

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ Each `Process` runs an asyncio event loop with one or more pika `AsyncioConnecti
5353
2. `Process.on_message` builds a `ProcessingContext` (Pydantic model) and schedules `invoke_consumer`
5454
3. `invoke_consumer` decodes the body via `Codec`, then calls `consumer.execute(ctx)`
5555
4. `_Consumer.execute` runs pre-validation (message type, retry limits), then delegates to `_run_consumer`
56-
5. `Consumer._run_consumer` acquires a lock and calls `prepare()``process()``finish()`; `TransactionConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument
56+
5. `Consumer._run_consumer` acquires a lock and calls `prepare()``process()``finish()`; `FunctionalConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument
5757
6. `Process.on_processed` handles the result: ack, nack, requeue, or republish based on the `Result` enum
5858

5959
### Key Module Responsibilities
6060

61-
- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `TransactionConsumer` (concurrent, ctx-style)
61+
- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `FunctionalConsumer` (concurrent, ctx-style)
6262
- **codecs.py**: `Codec` class handles encode/decode dispatch by content_type/content_encoding, plus async Avro schema loading
6363
- **connection.py**: Wraps pika `AsyncioConnection`, manages channel lifecycle, QoS, consumer tags
6464
- **process.py**: `Process(multiprocessing.Process)` — the per-consumer child process with asyncio event loop

README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@ each run in an isolated process. It has the ability to collect statistical
1010
data from the consumer processes and report on it.
1111

1212
[![Version](https://img.shields.io/pypi/v/rejected.svg?)](https://pypi.python.org/pypi/rejected)
13-
[![Python](https://img.shields.io/pypi/pyversions/rejected.svg)](https://pypi.python.org/pypi/rejected)
1413
[![License](https://img.shields.io/pypi/l/rejected.svg?)](https://github.com/gmr/rejected/blob/main/LICENSE)
1514

1615
## Features
1716

1817
- Async consumers built on `asyncio`
1918
- Automatic exception handling including connection management and consumer restarting
2019
- Smart consumer classes that automatically decode and deserialize message bodies based on message headers
21-
- Concurrent message processing with `TransactionConsumer`
20+
- Concurrent message processing with `FunctionalConsumer`
2221
- Metrics via statsd and/or Prometheus
2322
- Built-in profiling of consumer code
2423
- Avro schema support with file and HTTP schema registries
@@ -42,34 +41,36 @@ pip install rejected[sentry] # Sentry error reporting
4241

4342
## Documentation
4443

45-
Full documentation is available at [https://rejected.readthedocs.io](https://rejected.readthedocs.io).
44+
Full documentation is available at [https://gmr.github.io/rejected](https://gmr.github.io/rejected).
4645

4746
## Example Consumer
4847

4948
```python
50-
from rejected import consumer
5149
import logging
5250

51+
import rejected
52+
5353
LOGGER = logging.getLogger(__name__)
5454

5555

56-
class Test(consumer.Consumer):
56+
class Test(rejected.Consumer):
5757

5858
async def process(self) -> None:
5959
LOGGER.debug('In Test.process: %s', self.body)
6060
```
6161

62-
For concurrent message processing, use `TransactionConsumer`:
62+
For concurrent message processing, use `FunctionalConsumer`:
6363

6464
```python
65-
from rejected import consumer, models
6665
import logging
6766

67+
import rejected
68+
6869
LOGGER = logging.getLogger(__name__)
6970

7071

71-
class Test(consumer.TransactionConsumer):
72+
class Test(rejected.FunctionalConsumer):
7273

73-
async def process(self, ctx: models.ProcessingContext) -> None:
74+
async def process(self, ctx: rejected.ProcessingContext) -> None:
7475
LOGGER.debug('Processing: %s', ctx.message.body)
7576
```

docs/api.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ The primary base classes for building message consumers.
4949
- timestamp
5050
- user_id
5151

52-
### TransactionConsumer
52+
### FunctionalConsumer
5353

54-
::: rejected.consumer.TransactionConsumer
54+
::: rejected.consumer.FunctionalConsumer
5555
options:
5656
members:
5757
- process

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ Each consumer entry is a named object with the following attributes:
122122
| `qty` | Number of consumer processes to run | `1` |
123123
| `queue` | RabbitMQ queue name to consume from (defaults to consumer name) | |
124124
| `ack` | Explicitly acknowledge messages (`no_ack = !ack`) | `true` |
125-
| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `TransactionConsumer`) | `1` |
125+
| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `FunctionalConsumer`) | `1` |
126126
| `max_errors` | Errors within 60s before restarting the consumer | `5` |
127127
| `error_exchange` | Exchange to republish messages to on `ProcessingException` | |
128128
| `error_max_retry` | Max `ProcessingException` retries before dropping | |

docs/consumer_howto.md

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ The following example illustrates a very simple consumer that logs each
66
message body as it's received.
77

88
```python
9-
from rejected import consumer
109
import logging
1110

11+
import rejected
12+
1213
__version__ = '1.0.0'
1314

1415
LOGGER = logging.getLogger(__name__)
1516

1617

17-
class ExampleConsumer(consumer.Consumer):
18+
class ExampleConsumer(rejected.Consumer):
1819

1920
async def process(self):
2021
LOGGER.info(self.body)
@@ -51,22 +52,23 @@ error counter. When too many errors occur, rejected will automatically restart
5152
the consumer after a brief quiet period.
5253

5354
```python
54-
from rejected import consumer, exceptions
5555
import logging
5656

57+
import rejected
58+
5759
__version__ = '1.0.0'
5860

5961
LOGGER = logging.getLogger(__name__)
6062

6163

62-
class ExampleConsumer(consumer.Consumer):
64+
class ExampleConsumer(rejected.Consumer):
6365

6466
def _connect_to_database(self):
6567
return False
6668

6769
async def process(self):
6870
if not self._connect_to_database():
69-
raise exceptions.ConsumerException('Database error')
71+
raise rejected.ConsumerException('Database error')
7072
LOGGER.info(self.body)
7173
```
7274

@@ -128,7 +130,7 @@ If the type does not match:
128130
- Otherwise, a `MessageException` is raised.
129131

130132
```python
131-
class StrictConsumer(consumer.Consumer):
133+
class StrictConsumer(rejected.Consumer):
132134
MESSAGE_TYPE = 'user.created'
133135
DROP_INVALID_MESSAGES = True
134136
DROP_EXCHANGE = 'dead-letter'
@@ -143,7 +145,7 @@ Consumers can publish messages using `publish_message`. Note that it is
143145
an `async` method:
144146

145147
```python
146-
class ExampleConsumer(consumer.Consumer):
148+
class ExampleConsumer(rejected.Consumer):
147149

148150
async def process(self):
149151
LOGGER.info(self.body)
@@ -160,29 +162,30 @@ The `properties` parameter is a dict of AMQP properties (e.g., `content_type`,
160162
serialized and compressed based on the `content_type` and `content_encoding`
161163
properties.
162164

163-
## TransactionConsumer
165+
## FunctionalConsumer
164166

165-
`TransactionConsumer` is designed for concurrent message processing. Unlike
167+
`FunctionalConsumer` is designed for concurrent message processing. Unlike
166168
`Consumer`, it does not hold a lock -- multiple messages may be processed
167169
in parallel. Instead of accessing message properties via `self.body` etc.,
168170
the processing context is passed explicitly:
169171

170172
```python
171-
from rejected import consumer, models
172173
import logging
173174

175+
import rejected
176+
174177
LOGGER = logging.getLogger(__name__)
175178

176179

177-
class MyConcurrentConsumer(consumer.TransactionConsumer):
180+
class MyConcurrentConsumer(rejected.FunctionalConsumer):
178181

179-
async def prepare(self, ctx: models.ProcessingContext):
182+
async def prepare(self, ctx: rejected.ProcessingContext):
180183
LOGGER.debug('Preparing to process %s', ctx.message.message_id)
181184

182-
async def process(self, ctx: models.ProcessingContext):
185+
async def process(self, ctx: rejected.ProcessingContext):
183186
LOGGER.info('Processing: %s', ctx.message.body)
184187

185-
async def finish(self, ctx: models.ProcessingContext):
188+
async def finish(self, ctx: rejected.ProcessingContext):
186189
LOGGER.debug('Finished processing %s', ctx.message.message_id)
187190
```
188191

@@ -196,7 +199,7 @@ The `ProcessingContext` provides:
196199
- `ctx.result` -- the `Result` enum indicating message disposition
197200
- `ctx.received_at` -- monotonic timestamp when the message was received
198201

199-
Use `TransactionConsumer` with `qos_prefetch > 1` to process multiple
202+
Use `FunctionalConsumer` with `qos_prefetch > 1` to process multiple
200203
messages concurrently.
201204

202205
## Custom Metrics
@@ -205,7 +208,7 @@ Consumers can emit custom metrics that are forwarded to statsd and/or
205208
Prometheus:
206209

207210
```python
208-
class MetricsConsumer(consumer.Consumer):
211+
class MetricsConsumer(rejected.Consumer):
209212

210213
async def process(self):
211214
# Increment a counter
@@ -229,10 +232,12 @@ after processing messages. By default it collects every 10,000 messages.
229232
Configure the frequency via the `gc_collection_frequency` setting:
230233

231234
```python
232-
from rejected import consumer, mixins
235+
from rejected.mixins import GarbageCollectorMixin
236+
237+
import rejected
233238

234239

235-
class MyConsumer(mixins.GarbageCollectorMixin, consumer.Consumer):
240+
class MyConsumer(GarbageCollectorMixin, rejected.Consumer):
236241

237242
async def process(self):
238243
...

docs/index.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ data from the consumer processes and report on it.
1414
- Async consumers built on `asyncio`
1515
- Automatic exception handling including connection management and consumer restarting
1616
- Smart consumer classes that automatically decode and deserialize message bodies based on message headers
17-
- Concurrent message processing with `TransactionConsumer`
17+
- Concurrent message processing with `FunctionalConsumer`
1818
- Metrics via statsd and/or Prometheus
1919
- Built-in profiling of consumer code
2020
- Avro schema support with file and HTTP schema registries
@@ -39,13 +39,14 @@ pip install rejected[sentry] # Sentry error reporting
3939
## Quick Start
4040

4141
```python
42-
from rejected import consumer
4342
import logging
4443

44+
import rejected
45+
4546
LOGGER = logging.getLogger(__name__)
4647

4748

48-
class ExampleConsumer(consumer.Consumer):
49+
class ExampleConsumer(rejected.Consumer):
4950

5051
async def process(self):
5152
LOGGER.info(self.body)

0 commit comments

Comments
 (0)