Skip to content

Commit 6590032

Browse files
author
Anze
committed
Add Collector sources
1 parent 5fa1c75 commit 6590032

File tree

2 files changed

+324
-0
lines changed

2 files changed

+324
-0
lines changed

grafoleancollector/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .collector import Collector

grafoleancollector/collector.py

+323
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
import sys
2+
import requests
3+
import logging
4+
import time
5+
import math
6+
from datetime import datetime, timedelta
7+
from pytz import utc
8+
from abc import abstractmethod
9+
10+
import concurrent.futures
11+
import traceback
12+
13+
from apscheduler.schedulers.background import BackgroundScheduler
14+
from apscheduler.triggers.base import BaseTrigger
15+
from apscheduler.executors.base import BaseExecutor
16+
from apscheduler.events import (
17+
JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED)
18+
19+
20+
class MultipleIntervalsTrigger(BaseTrigger):
21+
"""
22+
This is a class extends APScheduler's BaseTrigger:
23+
- triggers at multiple intervals
24+
- aligns every invocation to a second (to make calculation of intervals easier)
25+
- multiple intervals, when aligned, cause only a single job invocation
26+
- remembers which intervals have caused the invocation; the list is cleared after
27+
`forget_affecting_after` seconds
28+
"""
29+
__slots__ = 'intervals', 'start_ts', 'affecting_intervals', 'forget_affecting_after'
30+
31+
def __init__(self, intervals, forget_affecting_after=300):
32+
if not intervals:
33+
raise Exception("At least one interval must be specified")
34+
# we only operate in whole seconds, and only care about unique values:
35+
self.intervals = list(set([int(i) for i in intervals]))
36+
self.forget_affecting_after = forget_affecting_after
37+
self.start_ts = int(time.time())
38+
self.affecting_intervals = {}
39+
40+
def get_next_fire_time(self, previous_fire_time, now):
41+
# We keep things simple by only dealing with UTC, and only with seconds, so
42+
# when travelling at low speeds we can use UNIX timestamps pretty safely.
43+
elapsed_time = now.timestamp() - self.start_ts
44+
# find the first time one of the intervals should fire:
45+
next_fires_for_intervals = [int(math.ceil(elapsed_time / interval) * interval) for interval in self.intervals]
46+
min_next_fire = min(next_fires_for_intervals)
47+
48+
# This is a hack. APScheduler doesn't allow us to pass information about the intervals triggered to the job being executed,
49+
# so we remember this information in the trigger object itself, which we then pass as a parameter to the executed job. Not
50+
# ideal, but it allows us to pass this information.
51+
# Determine which intervals will cause the next fire:
52+
next_fire_ts = self.start_ts + min_next_fire
53+
self.affecting_intervals[next_fire_ts] = []
54+
for i, next_fire_for_interval in enumerate(next_fires_for_intervals):
55+
if next_fire_for_interval == min_next_fire:
56+
self.affecting_intervals[next_fire_ts].append(self.intervals[i])
57+
58+
self._cleanup(now.timestamp() - self.forget_affecting_after)
59+
return datetime.fromtimestamp(next_fire_ts, tz=utc)
60+
61+
def _cleanup(self, limit_ts):
62+
for ts in list(self.affecting_intervals.keys()):
63+
if ts < limit_ts:
64+
del self.affecting_intervals[ts]
65+
66+
67+
class IntervalsAwareProcessPoolExecutor(BaseExecutor):
68+
"""
69+
This class merges APScheduler's BasePoolExecutor and ProcessPoolExecutor,
70+
because we need to use our own version of `run_job` (with a small detail
71+
changed - additional parameter passed). Unfortunately there is probably no
72+
cleaner way to do this at the moment.
73+
"""
74+
def __init__(self, max_workers=10):
75+
super().__init__()
76+
self._pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
77+
78+
def _do_submit_job(self, job, run_times):
79+
"""
80+
This function is copy-pasted from apscheduler/executors/pool.py
81+
(`BasePoolExecutor._do_submit_job()`). The difference is that it calls our own
82+
version of `run_job`.
83+
"""
84+
def callback(f):
85+
exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
86+
(f.exception(), getattr(f.exception(), '__traceback__', None)))
87+
if exc:
88+
self._run_job_error(job.id, exc, tb)
89+
else:
90+
self._run_job_success(job.id, f.result())
91+
92+
f = self._pool.submit(IntervalsAwareProcessPoolExecutor.run_job, job, job._jobstore_alias, run_times, self._logger.name)
93+
f.add_done_callback(callback)
94+
95+
def shutdown(self, wait=True):
96+
self._pool.shutdown(wait)
97+
98+
@staticmethod
99+
def run_job(job, jobstore_alias, run_times, logger_name):
100+
"""
101+
This function is copy-pasted from apscheduler/executors/base.py (`run_job()`). It is defined
102+
as static method here, and only the invocation of the job (`job.func()` call) was changed.
103+
104+
The reason for this is that we need to pass `affecting_intervals` from the trigger to the job
105+
function, so it can decide which parts of the job need to be run. SNMPCollector needs this
106+
so it can fetch data either separately, or for all of the task at the same time, when their
107+
intervals align.
108+
109+
The changes are in a single block and are marked with a comment.
110+
111+
---
112+
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
113+
scheduler.
114+
"""
115+
events = []
116+
logger = logging.getLogger(logger_name)
117+
for run_time in run_times:
118+
# See if the job missed its run time window, and handle
119+
# possible misfires accordingly
120+
if job.misfire_grace_time is not None:
121+
difference = datetime.now(utc) - run_time
122+
grace_time = timedelta(seconds=job.misfire_grace_time)
123+
if difference > grace_time:
124+
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
125+
run_time))
126+
logger.warning('Run time of job "%s" was missed by %s', job, difference)
127+
continue
128+
129+
logger.info('Running job "%s" (scheduled at %s)', job, run_time)
130+
try:
131+
##########################
132+
### changes
133+
##########################
134+
# retval = job.func(*job.args, **job.kwargs)
135+
affecting_intervals = job.trigger.affecting_intervals[run_time.timestamp()]
136+
retval = job.func(affecting_intervals, **job.kwargs)
137+
##########################
138+
### /changes
139+
##########################
140+
except BaseException:
141+
exc, tb = sys.exc_info()[1:]
142+
formatted_tb = ''.join(traceback.format_tb(tb))
143+
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
144+
exception=exc, traceback=formatted_tb))
145+
logger.exception('Job "%s" raised an exception', job)
146+
147+
# This is to prevent cyclic references that would lead to memory leaks
148+
traceback.clear_frames(tb)
149+
del tb
150+
else:
151+
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
152+
retval=retval))
153+
logger.info('Job "%s" executed successfully', job)
154+
155+
return events
156+
157+
def _run_job_error(self, job_id, exc, traceback=None):
158+
"""
159+
> Called by the executor with the exception if there is an error calling `run_job`.
160+
161+
Sometimes we start getting traceback, after which collector no longer works:
162+
-----
163+
2019-10-04 19:45:38 | ERR | Error submitting job "SNMPCollector.do_snmp (trigger: <collector.MultipleIntervalsTrigger object at 0x7fd866b9aee8>, next run at: 2019-10-04 19:45:38 UTC)" to executor "iaexecutor"
164+
Traceback (most recent call last):
165+
File "/usr/local/lib/python3.6/site-packages/apscheduler/schedulers/base.py", line 974, in _process_jobs
166+
executor.submit_job(job, run_times)
167+
File "/usr/local/lib/python3.6/site-packages/apscheduler/executors/base.py", line 71, in submit_job
168+
self._do_submit_job(job, run_times)
169+
File "./collector.py", line 92, in _do_submit_job
170+
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 452, in submit
171+
raise BrokenProcessPool('A child process terminated '
172+
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
173+
-----
174+
175+
The idea is that we remember that we are in this state, so that we can make Docker health check fail.
176+
"""
177+
super()._run_job_error(job_id, exc, traceback)
178+
179+
if 'BrokenProcessPool' in exc.__class__.__name__:
180+
# this file is checked by the Docker health check and if it exists, container should be restarted:
181+
open('/tmp/fail_health_check', 'a').close()
182+
183+
184+
class Collector(object):
185+
__slots__ = 'backend_url', 'bot_token', 'scheduler', 'known_jobs', 'jobs_refresh_interval'
186+
187+
def __init__(self, backend_url, bot_token, jobs_refresh_interval):
188+
self.backend_url = backend_url
189+
self.bot_token = bot_token
190+
self.jobs_refresh_interval = jobs_refresh_interval
191+
self.known_jobs = {}
192+
193+
@abstractmethod
194+
def jobs(self):
195+
"""
196+
Returns a list of (job_id, intervals, job_func, job_data) tuples. Usually calls
197+
`fetch_job_configs` to get input data.
198+
"""
199+
200+
def fetch_job_configs(self, protocol):
201+
"""
202+
Returns pairs (account_id, entity_info), where entity_info is everything needed for collecting data
203+
from the entity - credentials and list of sensors (with intervals) for selected protocol.
204+
The data is cleaned up as much as possible, so that it only contains the things necessary for collectors
205+
to do their job.
206+
"""
207+
# find all the accounts we have access to:
208+
r = requests.get('{}/accounts/?b={}'.format(self.backend_url, self.bot_token))
209+
if r.status_code != 200:
210+
raise Exception("Invalid bot token or network error, got status {} while retrieving {}/accounts".format(r.status_code, self.backend_url))
211+
j = r.json()
212+
accounts_ids = [a["id"] for a in j["list"]]
213+
214+
# find all entities for each of the accounts:
215+
for account_id in accounts_ids:
216+
r = requests.get('{}/accounts/{}/entities/?b={}'.format(self.backend_url, account_id, self.bot_token))
217+
if r.status_code != 200:
218+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/entities".format(r.status_code, self.backend_url, account_id))
219+
j = r.json()
220+
entities_ids = [e["id"] for e in j["list"]]
221+
222+
for entity_id in entities_ids:
223+
r = requests.get('{}/accounts/{}/entities/{}?b={}'.format(self.backend_url, account_id, entity_id, self.bot_token))
224+
if r.status_code != 200:
225+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/entities/{}".format(r.status_code, self.backend_url, account_id, entity_id))
226+
entity_info = r.json()
227+
228+
# make sure that the protocol is enabled on the entity:
229+
if protocol not in entity_info["protocols"]:
230+
continue
231+
# and that credential is set:
232+
if not entity_info["protocols"][protocol]["credential"]:
233+
continue
234+
credential_id = entity_info["protocols"][protocol]["credential"]
235+
# and that there is at least one sensor enabled for this protocol:
236+
if not entity_info["protocols"][protocol]["sensors"]:
237+
continue
238+
239+
r = requests.get('{}/accounts/{}/credentials/{}?b={}'.format(self.backend_url, account_id, credential_id, self.bot_token))
240+
if r.status_code != 200:
241+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/credentials/{}".format(r.status_code, self.backend_url, account_id, credential_id))
242+
credential = r.json()
243+
entity_info["credential_details"] = credential["details"]
244+
245+
sensors = []
246+
for sensor_info in entity_info["protocols"][protocol]["sensors"]:
247+
sensor_id = sensor_info["sensor"]
248+
r = requests.get('{}/accounts/{}/sensors/{}?b={}'.format(self.backend_url, account_id, sensor_id, self.bot_token))
249+
if r.status_code != 200:
250+
raise Exception("Network error, got status {} while retrieving {}/accounts/{}/sensors/{}".format(r.status_code, self.backend_url, account_id, sensor["sensor"]))
251+
sensor = r.json()
252+
253+
# determine interval, since this part is generic:
254+
if sensor_info["interval"] is not None:
255+
interval = sensor_info["interval"]
256+
elif sensor["default_interval"] is not None:
257+
interval = sensor["default_interval"]
258+
else:
259+
logging.warn("Interval not set, ignoring sensor {} on entity {}!".format(sensor_id, entity_id))
260+
continue
261+
del sensor["default_interval"] # cleanup - nobody should need this anymore
262+
263+
sensors.append({
264+
"sensor_details": sensor["details"],
265+
"sensor_id": sensor_id,
266+
"interval": interval,
267+
})
268+
# and hide all other protocols, saving just sensors for selected one: (not strictly necessary, just cleaner)
269+
entity_info["sensors"] = sensors
270+
del entity_info["protocols"]
271+
272+
entity_info["account_id"] = account_id
273+
entity_info["entity_id"] = entity_info["id"]
274+
del entity_info["id"]
275+
276+
yield entity_info
277+
278+
def refresh_jobs(self):
279+
wanted_jobs = set()
280+
for job_id, intervals, job_func, job_data in self.jobs():
281+
wanted_jobs.add(job_id)
282+
# if the existing job's configuration is the same, leave it alone, otherwise the trigger will be reset:
283+
if self.known_jobs.get(job_id) == job_data:
284+
continue
285+
self.known_jobs[job_id] = job_data
286+
287+
trigger = MultipleIntervalsTrigger(intervals)
288+
logging.info(f"Adding job: {job_id}")
289+
self.scheduler.add_job(job_func, id=job_id, trigger=trigger, executor='iaexecutor', kwargs=job_data, replace_existing=True)
290+
291+
# remove any jobs that are currently running but are no longer wanted:
292+
existing_jobs = set(self.known_jobs.keys())
293+
to_be_removed = existing_jobs - wanted_jobs
294+
for job_id in to_be_removed:
295+
del self.known_jobs[job_id]
296+
self.scheduler.remove_job(job_id)
297+
298+
def execute(self):
299+
"""
300+
Calls self.jobs() to get the list of the jobs, and executes them by using
301+
`MultipleIntervalsTrigger`. Blocking.
302+
"""
303+
# initialize APScheduler:
304+
job_defaults = {
305+
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
306+
'max_instances': 1,
307+
}
308+
self.scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=utc)
309+
self.scheduler.add_executor(IntervalsAwareProcessPoolExecutor(10), 'iaexecutor')
310+
311+
try:
312+
self.scheduler.start()
313+
while True:
314+
try:
315+
self.refresh_jobs()
316+
except:
317+
logging.exception("Error refreshing jobs.")
318+
time.sleep(self.jobs_refresh_interval)
319+
320+
except KeyboardInterrupt:
321+
logging.info("Got exit signal, exiting.")
322+
finally:
323+
self.scheduler.shutdown()

0 commit comments

Comments
 (0)