Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 67 additions & 21 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ YAML's format, like Python code, is whitespace dependent for control structure i
blocks. If you're having problems with your rejected configuration, the first
thing you should do is ensure that the YAML syntax is correct.

The configuration file is split into three main sections: `Application`, `Daemon`, and `Logging`.
The configuration file is split into two main sections: `Application` and `Logging`.

## Application

Expand All @@ -15,7 +15,7 @@ The application section of the configuration is broken down into multiple top-le
|--------|-------------|
| `poll_interval` | How often rejected should poll consumer processes for status in seconds (int/float) |
| `sentry_dsn` | If Sentry support is installed, optionally set a global DSN for all consumers (str) |
| `stats` | Enable and configure statsd metric submission (obj) |
| `stats` | Enable and configure metrics submission via statsd and/or Prometheus (obj) |
| `Connections` | A subsection with RabbitMQ connection information for consumers (obj) |
| `Consumers` | Where each consumer type is configured (obj) |

Expand All @@ -24,19 +24,19 @@ The application section of the configuration is broken down into multiple top-le
| Option | Description |
|--------|-------------|
| `log` | Toggle top-level logging of consumer process stats (bool) |
| `influxdb` | Configure the submission of per-message measurements to InfluxDB (obj) |
| `prometheus` | Configure the Prometheus metrics exporter (obj) |
| `statsd` | Configure the submission of per-message measurements to statsd (obj) |

#### influxdb
#### prometheus

Requires `rejected[prometheus]` to be installed.

| Option | Description |
|--------|-------------|
| `scheme` | The scheme to use when submitting metrics to the InfluxDB server. Default: `http` (str) |
| `host` | The hostname or ip address of the InfluxDB server. Default: `localhost` (str) |
| `port` | The port of the influxdb server. Default: `8086` (int) |
| `user` | An optional username to use when submitting measurements. (str) |
| `password` | An optional password to use when submitting measurements. (str) |
| `database` | The InfluxDB database to submit measurements to. Default: `rejected` (str) |
| `enabled` | Toggle the Prometheus metrics HTTP endpoint on and off (bool) |
| `port` | The port to serve the `/metrics` endpoint on. Default: `9090` (int) |

See [Prometheus Metrics](#prometheus-metrics) below for the full list of exposed metrics.

#### statsd

Expand Down Expand Up @@ -94,7 +94,7 @@ Each consumer entry should be a nested object with a unique name with consumer a
| `message_type` | Used to validate the message type of a message before processing (str or list) |
| `error_exchange` | The exchange to publish messages that raise `ProcessingException` to (str) |
| `error_max_retry` | The number of `ProcessingException` raised on a message before dropping it (int) |
| `influxdb_measurement` | When using InfluxDB, the measurement name for per-message measurements (str) |
| `schema_uri_format` | Avro schema URI format with `{0}` placeholder for message type. Supports `file://` and `http(s)://` schemes. Requires `rejected[avro]` (str) |
| `config` | Free-form key-value configuration section for the consumer (obj) |

#### Consumer Connections
Expand Down Expand Up @@ -130,16 +130,6 @@ Structured connection options:
| `consume` | Specify if the connection should consume on the connection (bool) |
| `publisher_confirmation` | Enable publisher confirmations (bool) |

## Daemon

This section contains the settings required to run the application as a daemon:

| Option | Description |
|--------|-------------|
| `user` | The username to run as when the process is daemonized (str) |
| `group` | Optional group name to switch to when the process is daemonized (str) |
| `pidfile` | The pidfile to write when the process is daemonized (str) |

## Logging

Rejected uses `logging.config.dictConfig` to create a flexible method for
Expand Down Expand Up @@ -181,3 +171,59 @@ If your application is not logging anything, ensure that you have created a
logger section in your configuration for your consumer package. For example, if
your Consumer instance is named `myconsumer.MyConsumer`, make sure there is a
`myconsumer` logger in the logging configuration.

## Prometheus Metrics

When `stats.prometheus.enabled` is `true` and `rejected[prometheus]` is installed,
rejected exposes a `/metrics` HTTP endpoint on the configured port for Prometheus
to scrape.

### Built-in Counters

All labeled by `consumer` name.

| Metric | Description |
|--------|-------------|
| `rejected_messages_acked_total` | Messages acknowledged |
| `rejected_messages_dropped_total` | Messages dropped |
| `rejected_messages_failed_total` | Messages that resulted in errors |
| `rejected_messages_nacked_total` | Messages negatively acknowledged |
| `rejected_messages_processed_total` | Messages processed |
| `rejected_messages_redelivered_total` | Redelivered messages |
| `rejected_messages_requeued_total` | Messages requeued |
| `rejected_processing_seconds_total` | Total processing time in seconds |
| `rejected_exceptions_total` | Exceptions (labeled by `consumer` and `type`) |

Exception `type` values: `consumer_exception`, `message_exception`,
`processing_exception`, `unhandled_exception`.

### Built-in Histograms

All labeled by `consumer` name.

| Metric | Description |
|--------|-------------|
| `rejected_processing_duration_seconds` | Per-message processing duration |
| `rejected_message_age_seconds` | Age of messages at time of processing |

### Built-in Gauges

| Metric | Description |
|--------|-------------|
| `rejected_consumer_processes` | Number of active consumer processes |

### Custom Metrics

Metrics created via `Consumer.stats_add_duration`, `Consumer.stats_incr`, and
`Consumer.stats_set_value` are automatically forwarded to Prometheus as
dynamically created metrics:

| Consumer Method | Prometheus Type | Metric Name Pattern |
|----------------|----------------|---------------------|
| `stats_add_duration(key, value)` | Histogram | `rejected_custom_{key}_seconds` |
| `stats_track_duration(key)` | Histogram | `rejected_custom_{key}_seconds` |
| `stats_incr(key, value)` | Counter | `rejected_custom_{key}_total` |
| `stats_set_value(key, value)` | Gauge | `rejected_custom_{key}` |

Metric names are sanitized: any characters not in `[a-zA-Z0-9_]` are replaced
with underscores.
9 changes: 6 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data from the consumer processes and report on it.

- Automatic exception handling including connection management and consumer restarting
- Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers
- Metrics logging and submission to statsd and InfluxDB
- Metrics via statsd and/or Prometheus
- Built-in profiling of consumer code
- Ability to write asynchronous code in consumers allowing for parallel communication with external resources

Expand All @@ -26,8 +26,11 @@ pip install rejected
For optional features:

```bash
pip install rejected[html] # HTML message body support
pip install rejected[msgpack] # MessagePack support
pip install rejected[avro] # Avro datum serialization
pip install rejected[html] # HTML message body support
pip install rejected[msgpack] # MessagePack support
pip install rejected[prometheus] # Prometheus metrics exporter
pip install rejected[sentry] # Sentry error reporting
```

## Quick Start
Expand Down
3 changes: 3 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Application:
# sentry_dsn: https://[YOUR-SENTRY-DSN]
stats:
log: True
prometheus:
enabled: False
port: 9090
statsd:
enabled: False
host: localhost
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
avro = ["fastavro>=1.7", "requests>=2.28"]
html = ["beautifulsoup4"]
msgpack = ["u-msgpack-python"]
prometheus = ["prometheus-client>=0.20"]
sentry = ["sentry-sdk>=2,<3"]

[project.scripts]
Expand Down
8 changes: 8 additions & 0 deletions rejected/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ class StatsdConfig(pydantic.BaseModel):
include_hostname: bool = True


class PrometheusConfig(pydantic.BaseModel):
enabled: bool = False
port: int = 9090


class StatsConfig(pydantic.BaseModel):
log: bool = False
prometheus: PrometheusConfig = pydantic.Field(
default_factory=PrometheusConfig
)
statsd: StatsdConfig = pydantic.Field(default_factory=StatsdConfig)


Expand Down
25 changes: 24 additions & 1 deletion rejected/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import psutil

from . import __version__, process, state
from . import __version__, process, prometheus, state

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,13 +199,24 @@ def collect_results(self, data_values):
process_name = data_values['name']
del data_values['name']

# Forward per-message observations to Prometheus
prometheus.observe(
consumer_name,
data_values.pop('durations', []),
data_values.pop('message_ages', []),
data_values.pop('custom_durations', {}),
data_values.pop('custom_counters', {}),
data_values.pop('custom_gauges', {}),
)

# Add it to our last poll global data
if consumer_name not in self.last_poll_results:
self.last_poll_results[consumer_name] = {}
self.last_poll_results[consumer_name][process_name] = data_values

# Calculate the stats
self.stats = self.calculate_stats(self.last_poll_results)
prometheus.update(self.stats)

@staticmethod
def consumer_keyword(counts):
Expand All @@ -225,9 +236,18 @@ def consumer_stats_counter():

"""
return {
process.Process.ACKED: 0,
process.Process.CONSUMER_EXCEPTION: 0,
process.Process.DROPPED: 0,
process.Process.ERROR: 0,
process.Process.MESSAGE_EXCEPTION: 0,
process.Process.NACKED: 0,
process.Process.PROCESSED: 0,
process.Process.PROCESSING_EXCEPTION: 0,
process.Process.REDELIVERED: 0,
process.Process.REQUEUED: 0,
process.Process.TIME_SPENT: 0,
process.Process.UNHANDLED_EXCEPTION: 0,
}

def get_consumer_process(self, consumer, name):
Expand Down Expand Up @@ -588,6 +608,9 @@ def run(self):
self.set_state(self.STATE_ACTIVE)
self.setup_consumers()

if self.config.stats.prometheus.enabled:
prometheus.start(self.config.stats.prometheus.port)

# Set the SIGCHLD handler for child creation errors
signal.signal(signal.SIGCHLD, self.on_sigchld)

Expand Down
41 changes: 41 additions & 0 deletions rejected/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ def __init__(
self.pending = collections.deque()
self.prepend_path = None
self.previous = None
self._duration_observations: list[float] = []
self._message_age_observations: list[float] = []
self._custom_durations: dict[str, list[float]] = {}
self._custom_counters: dict[str, int] = {}
self._custom_gauges: dict[str, float] = {}
self.sentry_client = None
self.state = self.STATE_INITIALIZING
self._tasks: set[asyncio.Task] = set()
Expand Down Expand Up @@ -802,6 +807,7 @@ def on_processed(self, message, result, start_time):
duration = time.monotonic() - start_time
self.counters[self.TIME_SPENT] += duration
self.measurement.add_duration(self.TIME_SPENT, duration)
self._duration_observations.append(duration)

if result == data.MESSAGE_DROP:
LOGGER.debug('Rejecting message due to drop return from consumer')
Expand Down Expand Up @@ -843,6 +849,11 @@ def on_processed(self, message, result, start_time):

self.counters[self.PROCESSED] += 1
self.measurement.set_tag(self.PROCESSED, True)
if message.properties.timestamp:
age = time.time() - message.properties.timestamp
if age > 0:
self._message_age_observations.append(age)
self._collect_custom_measurements()
self.maybe_submit_measurement()
self.reset_state()

Expand Down Expand Up @@ -943,6 +954,24 @@ def reject(self, message, requeue=True):
self.measurement.set_tag(self.NACKED, True)
self.measurement.set_tag(self.REQUEUED, requeue)

def _collect_custom_measurements(self):
"""Accumulate per-message Measurement data for Prometheus."""
if not self.measurement:
return
# Custom durations (excluding processing_time, already tracked)
for key, values in self.measurement.durations.items():
if key == self.TIME_SPENT:
continue
self._custom_durations.setdefault(key, []).extend(values)
# Custom counters
for key, value in self.measurement.counters.items():
self._custom_counters[key] = (
self._custom_counters.get(key, 0) + value
)
# Custom gauges (values dict on Measurement)
for key, value in self.measurement.values.items():
self._custom_gauges[key] = value

def report_stats(self):
"""Create the dict of stats data for the MCP stats queue"""
if not self.previous:
Expand All @@ -954,8 +983,20 @@ def report_stats(self):
'consumer_name': self.consumer_name,
'counts': dict(self.counters),
'previous': dict(self.previous),
'durations': list(self._duration_observations),
'message_ages': list(self._message_age_observations),
'custom_durations': {
k: list(v) for k, v in self._custom_durations.items()
},
'custom_counters': dict(self._custom_counters),
'custom_gauges': dict(self._custom_gauges),
}
self.previous = dict(self.counters)
self._duration_observations.clear()
self._message_age_observations.clear()
self._custom_durations.clear()
self._custom_counters.clear()
self._custom_gauges.clear()
return values

def reset_error_counter(self):
Expand Down
Loading
Loading