diff --git a/.travis.yml b/.travis.yml index d11b035..8c67aa0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,12 +13,12 @@ branches: - master - develop -install: pip install --quiet --use-mirrors tox +install: pip install --quiet tox script: tox after_script: - if [ $TOXENV == "cov" ]; then - pip install --quiet --use-mirrors coveralls; + pip install --quiet coveralls; coveralls; fi diff --git a/README.md b/README.md index 0c045b6..58a7771 100644 --- a/README.md +++ b/README.md @@ -248,6 +248,32 @@ Currently two types of alerts are supported: ] ``` +##### Cron expression intervals + +In addition to checking metrics at a fixed time interval, graphite-beacon also +supports cron expressions. + +See the below example for how to check a metric every 20 minutes between 8am and 5pm on weekdays: + +```js +alerts: [ + { + "name": "Cron-based alert", + // the cron expression + "interval": "*/20 8-17 * * 1-5", + "query": "Your graphite query here", + "rules": [ + "warning: 10", + "critical: 20" + ] + } +], +``` + +For more information about cron expressions, see https://en.wikipedia.org/wiki/Cron#CRON_expression. + +To build and test cron expressions, try http://crontab.guru. + ##### Historical values graphite-beacon supports "historical" values for a rule. diff --git a/graphite_beacon/alerts.py b/graphite_beacon/alerts.py index 920efac..1253892 100644 --- a/graphite_beacon/alerts.py +++ b/graphite_beacon/alerts.py @@ -15,6 +15,8 @@ import math from collections import deque, defaultdict from itertools import islice +from croniter import croniter +from datetime import datetime, timedelta LOGGER = log.gen_log @@ -38,6 +40,59 @@ def __getitem__(self, index): return type(self)(islice(self, index.start, index.stop, index.step)) +class CronCallback(object): + + """Callback that runs on a cron schedule.""" + + def __init__(self, callback, cron): + """Initialize a CronCallback object with the specified cron schedule and callback.""" + self.callback = callback + self.cron = cron + self.is_running = False + self.handle = None + + def start(self): + """Start running.""" + if not self.is_running: + self.is_running = True + self.schedule_next_run() + + def stop(self): + """Stop running.""" + if self.is_running: + handle = self.handle + self.is_running = False + if handle: + ioloop.IOLoop.instance().remove_timeout(handle) + self.handle = None + + def is_running(self): + """Is running.""" + return self.is_running + + def scheduled_run(self): + """Invoke the callback and schedule the next run.""" + if self.is_running: + LOGGER.debug("CronCallback: running cron schedule") + try: + self.callback() + finally: + self.schedule_next_run() + + def schedule_next_run(self): + """Schedule the next run of this callback.""" + if self.is_running: + now = datetime.now() + next_time = self.cron.get_next(datetime) + while next_time <= now: + next_time = self.cron.get_next(datetime) + LOGGER.debug("CronCallback: now: %s", now) + LOGGER.debug("CronCallback: next_time: %s", next_time) + td = next_time - now + total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 + self.handle = ioloop.IOLoop.instance().call_later(total_seconds, self.scheduled_run) + + class AlertFabric(type): """Register alert's classes and produce an alert by source.""" @@ -79,7 +134,8 @@ def __init__(self, reactor, **options): self.waiting = False self.state = {None: "normal", "waiting": "normal", "loading": "normal"} - self.history = defaultdict(lambda: sliceable_deque([], self.history_size)) + self.history = defaultdict(lambda: sliceable_deque([])) + self.history_times = defaultdict(lambda: sliceable_deque([])) LOGGER.info("Alert '%s': has inited", self) @@ -95,6 +151,10 @@ def __str__(self): """String representation.""" return "%s (%s)" % (self.name, self.interval) + def is_cron(self): + """Detect if an expression is a valid cron expression.""" + return len(self.interval.split()) in [5, 6] + def configure(self, name=None, rules=None, query=None, **options): """Configure the alert.""" self.name = name @@ -109,13 +169,6 @@ def configure(self, name=None, rules=None, query=None, **options): assert query, "%s: Alert's query is invalid" % self.name self.query = query - self.interval = interval_to_graphite( - options.get('interval', self.reactor.options['interval'])) - interval = parse_interval(self.interval) - - self.time_window = interval_to_graphite( - options.get('time_window', options.get('interval', self.reactor.options['interval']))) - self.until = interval_to_graphite( options.get('until', self.reactor.options['until']) ) @@ -128,15 +181,38 @@ def configure(self, name=None, rules=None, query=None, **options): self.history_size = options.get('history_size', self.reactor.options['history_size']) self.history_size = parse_interval(self.history_size) - self.history_size = int(math.ceil(self.history_size / interval)) + self.history_size = timedelta(milliseconds=self.history_size) self.no_data = options.get('no_data', self.reactor.options['no_data']) self.loading_error = options.get('loading_error', self.reactor.options['loading_error']) - if self.reactor.options.get('debug'): - self.callback = ioloop.PeriodicCallback(self.load, 5000) + self.interval = options.get('interval', self.reactor.options['interval']) + time_window = options.get('time_window', None) + + if self.is_cron(): + try: + cron = croniter(self.interval) + except Exception as e: + """Raise error if we failed parsing the cron interval""" + LOGGER.exception(e) + raise ValueError("Invalid cron expression '%s': %s" % (self.interval, e)) + assert time_window, "%s: Must supply time_window for cron scheduled alerts" % self.name + self.time_window = interval_to_graphite(time_window) + if self.reactor.options.get('debug'): + self.callback = ioloop.PeriodicCallback(self.load, 5000) + else: + self.callback = CronCallback(self.load, cron) else: - self.callback = ioloop.PeriodicCallback(self.load, interval) + self.interval = interval_to_graphite(self.interval) + interval = parse_interval(self.interval) + if time_window: + self.time_window = interval_to_graphite(time_window) + else: + self.time_window = interval_to_graphite(self.interval) + if self.reactor.options.get('debug'): + self.callback = ioloop.PeriodicCallback(self.load, 5000) + else: + self.callback = ioloop.PeriodicCallback(self.load, interval) def convert(self, value): """Convert self value.""" @@ -161,7 +237,7 @@ def stop(self): self.callback.stop() return self - def check(self, records): + def check(self, records, now=datetime.now()): """Check current value.""" for value, target in records: LOGGER.info("%s [%s]: %s", self.name, target, value) @@ -169,20 +245,28 @@ def check(self, records): self.notify(self.no_data, value, target) continue for rule in self.rules: - if self.evaluate_rule(rule, value, target): + if self.evaluate_rule(rule, value, target, now): self.notify(rule['level'], value, target, rule=rule) break else: self.notify('normal', value, target, rule=rule) - self.history[target].append(value) - - def evaluate_rule(self, rule, value, target): + history = self.history[target] + history.append(value) + history_times = self.history_times[target] + history_times.append(now) + history_threshold = now - self.history_size + """Remove historical values older than history_size""" + while len(history_times) > 0 and history_times[0] <= history_threshold: + history.popleft() + history_times.popleft() + + def evaluate_rule(self, rule, value, target, now): """Calculate the value.""" def evaluate(expr): if expr in LOGICAL_OPERATORS.values(): return expr - rvalue = self.get_value_for_expr(expr, target) + rvalue = self.get_value_for_expr(expr, target, now) if rvalue is None: return False # ignore this result return expr['op'](value, rvalue) @@ -194,14 +278,16 @@ def evaluate(expr): return evaluated[0] - def get_value_for_expr(self, expr, target): + def get_value_for_expr(self, expr, target, now=datetime.now()): """I have no idea.""" if expr in LOGICAL_OPERATORS.values(): return None rvalue = expr['value'] if rvalue == HISTORICAL: history = self.history[target] - if len(history) < self.history_size: + history_times = self.history_times[target] + """Don't return a historical value if the history buffer is not full""" + if len(history_times) < 1 or history_times[0] + self.history_size > now: return None rvalue = sum(history) / float(len(history)) diff --git a/requirements.txt b/requirements.txt index a07fb74..f81d5e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ tornado == 4.1.0 funcparserlib==0.3.6 +croniter==0.3.10 diff --git a/tests.py b/tests.py index 098ef87..04be115 100644 --- a/tests.py +++ b/tests.py @@ -4,6 +4,21 @@ import pytest import mock +from graphite_beacon.alerts import BaseAlert +from graphite_beacon.utils import parse_interval +from datetime import datetime, timedelta + +"""Make every call to check() elapse the alert interval (or 10 minutes)""" +def check(alert, records, now=datetime.now()): + if not alert.is_cron(): + alert.delta = timedelta(milliseconds=parse_interval(alert.interval)) + alert._check(records, alert.time) + alert.time = alert.time + alert.delta + +BaseAlert.time = datetime.now() +BaseAlert.delta = timedelta(minutes=10) +BaseAlert._check = BaseAlert.check +BaseAlert.check = check @pytest.fixture def reactor(): @@ -59,7 +74,7 @@ def test_public_graphite_url(): def test_alert(reactor): - from graphite_beacon.alerts import BaseAlert, GraphiteAlert, URLAlert + from graphite_beacon.alerts import GraphiteAlert, URLAlert alert1 = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"]) assert alert1 @@ -76,13 +91,16 @@ def test_alert(reactor): assert alert1 == alert3 assert set([alert1, alert3]) == set([alert1]) + alert4 = BaseAlert.get(reactor, name='Test', query='*', interval='*/10 09-18 * * 1-5', time_window='1h', rules=["normal: == 0"]) + assert alert4.interval == '*/10 09-18 * * 1-5' + alert4.start() + alert4.stop() + alert = BaseAlert.get(reactor, name='Test', query='*', rules=["warning: >= 3MB"]) assert alert.rules[0]['exprs'][0]['value'] == 3145728 def test_multimetrics(reactor): - from graphite_beacon.alerts import BaseAlert - alert = BaseAlert.get( reactor, name="Test", query="*", rules=[ "critical: > 100", "warning: > 50", "warning: < historical / 2"]) @@ -150,16 +168,17 @@ def test_multimetrics(reactor): def test_multiexpressions(reactor): - from graphite_beacon.alerts import BaseAlert - alert = BaseAlert.get( reactor, name="Test", query="*", rules=["warning: > historical * 1.05 AND > 70"]) reactor.alerts = set([alert]) with mock.patch.object(reactor, 'notify'): - alert.check([ - (50, 'metric1'), (65, 'metric1'), (85, 'metric1'), (65, 'metric1'), - (68, 'metric1'), (75, 'metric1')]) + alert.check([(50, 'metric1')]) + alert.check([(65, 'metric1')]) + alert.check([(85, 'metric1')]) + alert.check([(65, 'metric1')]) + alert.check([(68, 'metric1')]) + alert.check([(75, 'metric1')]) assert reactor.notify.call_count == 1 @@ -213,8 +232,6 @@ def test_convert(): def test_parse_interval(): - from graphite_beacon.utils import parse_interval - assert parse_interval(10) == 10000.0 assert parse_interval('10') == 10000.0 assert parse_interval('15s') == 15000.0 @@ -270,7 +287,6 @@ def test_parse_rule(): def test_html_template(reactor): from graphite_beacon.handlers.smtp import SMTPHandler - from graphite_beacon.alerts import BaseAlert target = 'node.com' galert = BaseAlert.get(reactor, name='Test', query='*', rules=["normal: == 0"]) diff --git a/tox.ini b/tox.ini index 530b4a7..6b6df49 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ deps = mock pytest tornado + croniter [testenv:cov] deps =