Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Application:
# sentry_dsn: https://[YOUR-SENTRY-DSN]
stats:
log: True
prometheus:
enabled: False
port: 9090
statsd:
enabled: False
host: localhost
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
avro = ["fastavro>=1.7", "requests>=2.28"]
html = ["beautifulsoup4"]
msgpack = ["u-msgpack-python"]
prometheus = ["prometheus-client>=0.20"]
sentry = ["sentry-sdk>=2,<3"]

[project.scripts]
Expand Down
8 changes: 8 additions & 0 deletions rejected/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ class StatsdConfig(pydantic.BaseModel):
include_hostname: bool = True


class PrometheusConfig(pydantic.BaseModel):
enabled: bool = False
port: int = 9090


class StatsConfig(pydantic.BaseModel):
log: bool = False
prometheus: PrometheusConfig = pydantic.Field(
default_factory=PrometheusConfig
)
statsd: StatsdConfig = pydantic.Field(default_factory=StatsdConfig)


Expand Down
25 changes: 24 additions & 1 deletion rejected/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import psutil

from . import __version__, process, state
from . import __version__, process, prometheus, state

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,13 +199,24 @@ def collect_results(self, data_values):
process_name = data_values['name']
del data_values['name']

# Forward per-message observations to Prometheus
prometheus.observe(
consumer_name,
data_values.pop('durations', []),
data_values.pop('message_ages', []),
data_values.pop('custom_durations', {}),
data_values.pop('custom_counters', {}),
data_values.pop('custom_gauges', {}),
)

# Add it to our last poll global data
if consumer_name not in self.last_poll_results:
self.last_poll_results[consumer_name] = {}
self.last_poll_results[consumer_name][process_name] = data_values

# Calculate the stats
self.stats = self.calculate_stats(self.last_poll_results)
prometheus.update(self.stats)

@staticmethod
def consumer_keyword(counts):
Expand All @@ -225,9 +236,18 @@ def consumer_stats_counter():

"""
return {
process.Process.ACKED: 0,
process.Process.CONSUMER_EXCEPTION: 0,
process.Process.DROPPED: 0,
process.Process.ERROR: 0,
process.Process.MESSAGE_EXCEPTION: 0,
process.Process.NACKED: 0,
process.Process.PROCESSED: 0,
process.Process.PROCESSING_EXCEPTION: 0,
process.Process.REDELIVERED: 0,
process.Process.REQUEUED: 0,
process.Process.TIME_SPENT: 0,
process.Process.UNHANDLED_EXCEPTION: 0,
}

def get_consumer_process(self, consumer, name):
Expand Down Expand Up @@ -588,6 +608,9 @@ def run(self):
self.set_state(self.STATE_ACTIVE)
self.setup_consumers()

if self.config.stats.prometheus.enabled:
prometheus.start(self.config.stats.prometheus.port)

# Set the SIGCHLD handler for child creation errors
signal.signal(signal.SIGCHLD, self.on_sigchld)

Expand Down
41 changes: 41 additions & 0 deletions rejected/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ def __init__(
self.pending = collections.deque()
self.prepend_path = None
self.previous = None
self._duration_observations: list[float] = []
self._message_age_observations: list[float] = []
self._custom_durations: dict[str, list[float]] = {}
self._custom_counters: dict[str, int] = {}
self._custom_gauges: dict[str, float] = {}
self.sentry_client = None
self.state = self.STATE_INITIALIZING
self._tasks: set[asyncio.Task] = set()
Expand Down Expand Up @@ -802,6 +807,7 @@ def on_processed(self, message, result, start_time):
duration = time.monotonic() - start_time
self.counters[self.TIME_SPENT] += duration
self.measurement.add_duration(self.TIME_SPENT, duration)
self._duration_observations.append(duration)

if result == data.MESSAGE_DROP:
LOGGER.debug('Rejecting message due to drop return from consumer')
Expand Down Expand Up @@ -843,6 +849,11 @@ def on_processed(self, message, result, start_time):

self.counters[self.PROCESSED] += 1
self.measurement.set_tag(self.PROCESSED, True)
if message.properties.timestamp:
age = time.time() - message.properties.timestamp
if age > 0:
self._message_age_observations.append(age)
self._collect_custom_measurements()
self.maybe_submit_measurement()
self.reset_state()

Expand Down Expand Up @@ -943,6 +954,24 @@ def reject(self, message, requeue=True):
self.measurement.set_tag(self.NACKED, True)
self.measurement.set_tag(self.REQUEUED, requeue)

def _collect_custom_measurements(self):
"""Accumulate per-message Measurement data for Prometheus."""
if not self.measurement:
return
# Custom durations (excluding processing_time, already tracked)
for key, values in self.measurement.durations.items():
if key == self.TIME_SPENT:
continue
self._custom_durations.setdefault(key, []).extend(values)
# Custom counters
for key, value in self.measurement.counters.items():
self._custom_counters[key] = (
self._custom_counters.get(key, 0) + value
)
# Custom gauges (values dict on Measurement)
for key, value in self.measurement.values.items():
self._custom_gauges[key] = value

def report_stats(self):
"""Create the dict of stats data for the MCP stats queue"""
if not self.previous:
Expand All @@ -954,8 +983,20 @@ def report_stats(self):
'consumer_name': self.consumer_name,
'counts': dict(self.counters),
'previous': dict(self.previous),
'durations': list(self._duration_observations),
'message_ages': list(self._message_age_observations),
'custom_durations': {
k: list(v) for k, v in self._custom_durations.items()
},
'custom_counters': dict(self._custom_counters),
'custom_gauges': dict(self._custom_gauges),
}
self.previous = dict(self.counters)
self._duration_observations.clear()
self._message_age_observations.clear()
self._custom_durations.clear()
self._custom_counters.clear()
self._custom_gauges.clear()
return values

def reset_error_counter(self):
Expand Down
Loading
Loading