Skip to content

Commit 8478c39

Browse files
authored
Merge pull request #59 from gmr/prometheus-exporter
Add Prometheus metrics exporter
2 parents 133907e + a13b1cf commit 8478c39

8 files changed

Lines changed: 426 additions & 25 deletions

File tree

docs/configuration.md

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ YAML's format, like Python code, is whitespace dependent for control structure i
55
blocks. If you're having problems with your rejected configuration, the first
66
thing you should do is ensure that the YAML syntax is correct.
77

8-
The configuration file is split into three main sections: `Application`, `Daemon`, and `Logging`.
8+
The configuration file is split into two main sections: `Application` and `Logging`.
99

1010
## Application
1111

@@ -15,7 +15,7 @@ The application section of the configuration is broken down into multiple top-le
1515
|--------|-------------|
1616
| `poll_interval` | How often rejected should poll consumer processes for status in seconds (int/float) |
1717
| `sentry_dsn` | If Sentry support is installed, optionally set a global DSN for all consumers (str) |
18-
| `stats` | Enable and configure statsd metric submission (obj) |
18+
| `stats` | Enable and configure metrics submission via statsd and/or Prometheus (obj) |
1919
| `Connections` | A subsection with RabbitMQ connection information for consumers (obj) |
2020
| `Consumers` | Where each consumer type is configured (obj) |
2121

@@ -24,19 +24,19 @@ The application section of the configuration is broken down into multiple top-le
2424
| Option | Description |
2525
|--------|-------------|
2626
| `log` | Toggle top-level logging of consumer process stats (bool) |
27-
| `influxdb` | Configure the submission of per-message measurements to InfluxDB (obj) |
27+
| `prometheus` | Configure the Prometheus metrics exporter (obj) |
2828
| `statsd` | Configure the submission of per-message measurements to statsd (obj) |
2929

30-
#### influxdb
30+
#### prometheus
31+
32+
Requires `rejected[prometheus]` to be installed.
3133

3234
| Option | Description |
3335
|--------|-------------|
34-
| `scheme` | The scheme to use when submitting metrics to the InfluxDB server. Default: `http` (str) |
35-
| `host` | The hostname or ip address of the InfluxDB server. Default: `localhost` (str) |
36-
| `port` | The port of the influxdb server. Default: `8086` (int) |
37-
| `user` | An optional username to use when submitting measurements. (str) |
38-
| `password` | An optional password to use when submitting measurements. (str) |
39-
| `database` | The InfluxDB database to submit measurements to. Default: `rejected` (str) |
36+
| `enabled` | Toggle the Prometheus metrics HTTP endpoint on and off (bool) |
37+
| `port` | The port to serve the `/metrics` endpoint on. Default: `9090` (int) |
38+
39+
See [Prometheus Metrics](#prometheus-metrics) below for the full list of exposed metrics.
4040

4141
#### statsd
4242

@@ -94,7 +94,7 @@ Each consumer entry should be a nested object with a unique name with consumer a
9494
| `message_type` | Used to validate the message type of a message before processing (str or list) |
9595
| `error_exchange` | The exchange to publish messages that raise `ProcessingException` to (str) |
9696
| `error_max_retry` | The number of `ProcessingException` raised on a message before dropping it (int) |
97-
| `influxdb_measurement` | When using InfluxDB, the measurement name for per-message measurements (str) |
97+
| `schema_uri_format` | Avro schema URI format with `{0}` placeholder for message type. Supports `file://` and `http(s)://` schemes. Requires `rejected[avro]` (str) |
9898
| `config` | Free-form key-value configuration section for the consumer (obj) |
9999

100100
#### Consumer Connections
@@ -130,16 +130,6 @@ Structured connection options:
130130
| `consume` | Specify if the connection should consume on the connection (bool) |
131131
| `publisher_confirmation` | Enable publisher confirmations (bool) |
132132

133-
## Daemon
134-
135-
This section contains the settings required to run the application as a daemon:
136-
137-
| Option | Description |
138-
|--------|-------------|
139-
| `user` | The username to run as when the process is daemonized (str) |
140-
| `group` | Optional group name to switch to when the process is daemonized (str) |
141-
| `pidfile` | The pidfile to write when the process is daemonized (str) |
142-
143133
## Logging
144134

145135
Rejected uses `logging.config.dictConfig` to create a flexible method for
@@ -181,3 +171,59 @@ If your application is not logging anything, ensure that you have created a
181171
logger section in your configuration for your consumer package. For example, if
182172
your Consumer instance is named `myconsumer.MyConsumer`, make sure there is a
183173
`myconsumer` logger in the logging configuration.
174+
175+
## Prometheus Metrics
176+
177+
When `stats.prometheus.enabled` is `true` and `rejected[prometheus]` is installed,
178+
rejected exposes a `/metrics` HTTP endpoint on the configured port for Prometheus
179+
to scrape.
180+
181+
### Built-in Counters
182+
183+
All labeled by `consumer` name.
184+
185+
| Metric | Description |
186+
|--------|-------------|
187+
| `rejected_messages_acked_total` | Messages acknowledged |
188+
| `rejected_messages_dropped_total` | Messages dropped |
189+
| `rejected_messages_failed_total` | Messages that resulted in errors |
190+
| `rejected_messages_nacked_total` | Messages negatively acknowledged |
191+
| `rejected_messages_processed_total` | Messages processed |
192+
| `rejected_messages_redelivered_total` | Redelivered messages |
193+
| `rejected_messages_requeued_total` | Messages requeued |
194+
| `rejected_processing_seconds_total` | Total processing time in seconds |
195+
| `rejected_exceptions_total` | Exceptions (labeled by `consumer` and `type`) |
196+
197+
Exception `type` values: `consumer_exception`, `message_exception`,
198+
`processing_exception`, `unhandled_exception`.
199+
200+
### Built-in Histograms
201+
202+
All labeled by `consumer` name.
203+
204+
| Metric | Description |
205+
|--------|-------------|
206+
| `rejected_processing_duration_seconds` | Per-message processing duration |
207+
| `rejected_message_age_seconds` | Age of messages at time of processing |
208+
209+
### Built-in Gauges
210+
211+
| Metric | Description |
212+
|--------|-------------|
213+
| `rejected_consumer_processes` | Number of active consumer processes |
214+
215+
### Custom Metrics
216+
217+
Metrics created via `Consumer.stats_add_duration`, `Consumer.stats_incr`, and
218+
`Consumer.stats_set_value` are automatically forwarded to Prometheus as
219+
dynamically created metrics:
220+
221+
| Consumer Method | Prometheus Type | Metric Name Pattern |
222+
|----------------|----------------|---------------------|
223+
| `stats_add_duration(key, value)` | Histogram | `rejected_custom_{key}_seconds` |
224+
| `stats_track_duration(key)` | Histogram | `rejected_custom_{key}_seconds` |
225+
| `stats_incr(key, value)` | Counter | `rejected_custom_{key}_total` |
226+
| `stats_set_value(key, value)` | Gauge | `rejected_custom_{key}` |
227+
228+
Metric names are sanitized: any characters not in `[a-zA-Z0-9_]` are replaced
229+
with underscores.

docs/index.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ data from the consumer processes and report on it.
1313

1414
- Automatic exception handling including connection management and consumer restarting
1515
- Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers
16-
- Metrics logging and submission to statsd and InfluxDB
16+
- Metrics via statsd and/or Prometheus
1717
- Built-in profiling of consumer code
1818
- Ability to write asynchronous code in consumers allowing for parallel communication with external resources
1919

@@ -26,8 +26,11 @@ pip install rejected
2626
For optional features:
2727

2828
```bash
29-
pip install rejected[html] # HTML message body support
30-
pip install rejected[msgpack] # MessagePack support
29+
pip install rejected[avro] # Avro datum serialization
30+
pip install rejected[html] # HTML message body support
31+
pip install rejected[msgpack] # MessagePack support
32+
pip install rejected[prometheus] # Prometheus metrics exporter
33+
pip install rejected[sentry] # Sentry error reporting
3134
```
3235

3336
## Quick Start

example.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ Application:
55
# sentry_dsn: https://[YOUR-SENTRY-DSN]
66
stats:
77
log: True
8+
prometheus:
9+
enabled: False
10+
port: 9090
811
statsd:
912
enabled: False
1013
host: localhost

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ dependencies = [
4040
avro = ["fastavro>=1.7", "requests>=2.28"]
4141
html = ["beautifulsoup4"]
4242
msgpack = ["u-msgpack-python"]
43+
prometheus = ["prometheus-client>=0.20"]
4344
sentry = ["sentry-sdk>=2,<3"]
4445

4546
[project.scripts]

rejected/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ class StatsdConfig(pydantic.BaseModel):
4343
include_hostname: bool = True
4444

4545

46+
class PrometheusConfig(pydantic.BaseModel):
47+
enabled: bool = False
48+
port: int = 9090
49+
50+
4651
class StatsConfig(pydantic.BaseModel):
4752
log: bool = False
53+
prometheus: PrometheusConfig = pydantic.Field(
54+
default_factory=PrometheusConfig
55+
)
4856
statsd: StatsdConfig = pydantic.Field(default_factory=StatsdConfig)
4957

5058

rejected/mcp.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import psutil
1616

17-
from . import __version__, process, state
17+
from . import __version__, process, prometheus, state
1818

1919
LOGGER = logging.getLogger(__name__)
2020

@@ -199,13 +199,24 @@ def collect_results(self, data_values):
199199
process_name = data_values['name']
200200
del data_values['name']
201201

202+
# Forward per-message observations to Prometheus
203+
prometheus.observe(
204+
consumer_name,
205+
data_values.pop('durations', []),
206+
data_values.pop('message_ages', []),
207+
data_values.pop('custom_durations', {}),
208+
data_values.pop('custom_counters', {}),
209+
data_values.pop('custom_gauges', {}),
210+
)
211+
202212
# Add it to our last poll global data
203213
if consumer_name not in self.last_poll_results:
204214
self.last_poll_results[consumer_name] = {}
205215
self.last_poll_results[consumer_name][process_name] = data_values
206216

207217
# Calculate the stats
208218
self.stats = self.calculate_stats(self.last_poll_results)
219+
prometheus.update(self.stats)
209220

210221
@staticmethod
211222
def consumer_keyword(counts):
@@ -225,9 +236,18 @@ def consumer_stats_counter():
225236
226237
"""
227238
return {
239+
process.Process.ACKED: 0,
240+
process.Process.CONSUMER_EXCEPTION: 0,
241+
process.Process.DROPPED: 0,
228242
process.Process.ERROR: 0,
243+
process.Process.MESSAGE_EXCEPTION: 0,
244+
process.Process.NACKED: 0,
229245
process.Process.PROCESSED: 0,
246+
process.Process.PROCESSING_EXCEPTION: 0,
230247
process.Process.REDELIVERED: 0,
248+
process.Process.REQUEUED: 0,
249+
process.Process.TIME_SPENT: 0,
250+
process.Process.UNHANDLED_EXCEPTION: 0,
231251
}
232252

233253
def get_consumer_process(self, consumer, name):
@@ -588,6 +608,9 @@ def run(self):
588608
self.set_state(self.STATE_ACTIVE)
589609
self.setup_consumers()
590610

611+
if self.config.stats.prometheus.enabled:
612+
prometheus.start(self.config.stats.prometheus.port)
613+
591614
# Set the SIGCHLD handler for child creation errors
592615
signal.signal(signal.SIGCHLD, self.on_sigchld)
593616

rejected/process.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ def __init__(
486486
self.pending = collections.deque()
487487
self.prepend_path = None
488488
self.previous = None
489+
self._duration_observations: list[float] = []
490+
self._message_age_observations: list[float] = []
491+
self._custom_durations: dict[str, list[float]] = {}
492+
self._custom_counters: dict[str, int] = {}
493+
self._custom_gauges: dict[str, float] = {}
489494
self.sentry_client = None
490495
self.state = self.STATE_INITIALIZING
491496
self._tasks: set[asyncio.Task] = set()
@@ -802,6 +807,7 @@ def on_processed(self, message, result, start_time):
802807
duration = time.monotonic() - start_time
803808
self.counters[self.TIME_SPENT] += duration
804809
self.measurement.add_duration(self.TIME_SPENT, duration)
810+
self._duration_observations.append(duration)
805811

806812
if result == data.MESSAGE_DROP:
807813
LOGGER.debug('Rejecting message due to drop return from consumer')
@@ -843,6 +849,11 @@ def on_processed(self, message, result, start_time):
843849

844850
self.counters[self.PROCESSED] += 1
845851
self.measurement.set_tag(self.PROCESSED, True)
852+
if message.properties.timestamp:
853+
age = time.time() - message.properties.timestamp
854+
if age > 0:
855+
self._message_age_observations.append(age)
856+
self._collect_custom_measurements()
846857
self.maybe_submit_measurement()
847858
self.reset_state()
848859

@@ -943,6 +954,24 @@ def reject(self, message, requeue=True):
943954
self.measurement.set_tag(self.NACKED, True)
944955
self.measurement.set_tag(self.REQUEUED, requeue)
945956

957+
def _collect_custom_measurements(self):
958+
"""Accumulate per-message Measurement data for Prometheus."""
959+
if not self.measurement:
960+
return
961+
# Custom durations (excluding processing_time, already tracked)
962+
for key, values in self.measurement.durations.items():
963+
if key == self.TIME_SPENT:
964+
continue
965+
self._custom_durations.setdefault(key, []).extend(values)
966+
# Custom counters
967+
for key, value in self.measurement.counters.items():
968+
self._custom_counters[key] = (
969+
self._custom_counters.get(key, 0) + value
970+
)
971+
# Custom gauges (values dict on Measurement)
972+
for key, value in self.measurement.values.items():
973+
self._custom_gauges[key] = value
974+
946975
def report_stats(self):
947976
"""Create the dict of stats data for the MCP stats queue"""
948977
if not self.previous:
@@ -954,8 +983,20 @@ def report_stats(self):
954983
'consumer_name': self.consumer_name,
955984
'counts': dict(self.counters),
956985
'previous': dict(self.previous),
986+
'durations': list(self._duration_observations),
987+
'message_ages': list(self._message_age_observations),
988+
'custom_durations': {
989+
k: list(v) for k, v in self._custom_durations.items()
990+
},
991+
'custom_counters': dict(self._custom_counters),
992+
'custom_gauges': dict(self._custom_gauges),
957993
}
958994
self.previous = dict(self.counters)
995+
self._duration_observations.clear()
996+
self._message_age_observations.clear()
997+
self._custom_durations.clear()
998+
self._custom_counters.clear()
999+
self._custom_gauges.clear()
9591000
return values
9601001

9611002
def reset_error_counter(self):

0 commit comments

Comments
 (0)