Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Application:
# sentry_dsn: https://[YOUR-SENTRY-DSN]
stats:
log: True
prometheus:
enabled: False
port: 9090
statsd:
enabled: False
host: localhost
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
avro = ["fastavro>=1.7", "requests>=2.28"]
html = ["beautifulsoup4"]
msgpack = ["u-msgpack-python"]
prometheus = ["prometheus-client>=0.20"]
sentry = ["sentry-sdk>=2,<3"]

[project.scripts]
Expand Down
8 changes: 8 additions & 0 deletions rejected/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ class StatsdConfig(pydantic.BaseModel):
include_hostname: bool = True


class PrometheusConfig(pydantic.BaseModel):
enabled: bool = False
port: int = 9090


class StatsConfig(pydantic.BaseModel):
log: bool = False
prometheus: PrometheusConfig = pydantic.Field(
default_factory=PrometheusConfig
)
statsd: StatsdConfig = pydantic.Field(default_factory=StatsdConfig)


Expand Down
6 changes: 5 additions & 1 deletion rejected/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import psutil

from . import __version__, process, state
from . import __version__, process, prometheus, state

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,6 +206,7 @@ def collect_results(self, data_values):

# Calculate the stats
self.stats = self.calculate_stats(self.last_poll_results)
prometheus.update(self.stats)

@staticmethod
def consumer_keyword(counts):
Expand Down Expand Up @@ -588,6 +589,9 @@ def run(self):
self.set_state(self.STATE_ACTIVE)
self.setup_consumers()

if self.config.stats.prometheus.enabled:
prometheus.start(self.config.stats.prometheus.port)

# Set the SIGCHLD handler for child creation errors
signal.signal(signal.SIGCHLD, self.on_sigchld)

Expand Down
86 changes: 86 additions & 0 deletions rejected/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Prometheus metrics exporter for rejected.

Exposes per-consumer metrics via an HTTP endpoint that Prometheus scrapes.
Requires ``rejected[prometheus]`` to be installed.

"""

import logging

try:
import prometheus_client
except ImportError:
prometheus_client = None

LOGGER = logging.getLogger(__name__)

# Metrics are module-level singletons, created on first call to start().
_messages_processed = None
_messages_failed = None
_messages_redelivered = None
_consumer_processes = None
_started = False


def start(port: int) -> None:
"""Start the Prometheus HTTP metrics server.

:param int port: The port to listen on

"""
global _messages_processed, _messages_failed
global _messages_redelivered, _consumer_processes, _started

if not prometheus_client:
LOGGER.error(
'prometheus_client is not installed; install rejected[prometheus]'
)
return

if _started:
LOGGER.warning('Prometheus exporter already running')
return

_messages_processed = prometheus_client.Gauge(
'rejected_messages_processed_total',
'Total messages processed',
['consumer'],
)
_messages_failed = prometheus_client.Gauge(
'rejected_messages_failed_total',
'Total messages that resulted in errors',
['consumer'],
)
_messages_redelivered = prometheus_client.Gauge(
'rejected_messages_redelivered_total',
'Total redelivered messages',
['consumer'],
)
_consumer_processes = prometheus_client.Gauge(
'rejected_consumer_processes',
'Number of active consumer processes',
['consumer'],
)

prometheus_client.start_http_server(port)
_started = True
LOGGER.info('Prometheus metrics server started on port %d', port)


def update(stats: dict) -> None:
"""Update Prometheus metrics from the MCP stats dict.

:param dict stats: The stats dict from MCP.calculate_stats()

"""
if not _started:
return

consumers = stats.get('consumers', {})
for name, data in consumers.items():
_messages_processed.labels(consumer=name).set(data.get('processed', 0))
_messages_failed.labels(consumer=name).set(data.get('failed', 0))
_messages_redelivered.labels(consumer=name).set(
data.get('redelivered', 0)
)
_consumer_processes.labels(consumer=name).set(data.get('processes', 0))
Loading