Skip to content

update exporter to work asynchronous #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions prometheus_eaton_ups_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from prometheus_client import start_http_server, REGISTRY
from prometheus_eaton_ups_exporter.scraper_globals import REQUEST_TIMEOUT
from prometheus_eaton_ups_exporter.exporter import UPSMultiExporter
from prometheus_eaton_ups_exporter.exporter import UPSExporter

DEFAULT_PORT = 9795
DEFAULT_HOST = "0.0.0.0"
Expand Down Expand Up @@ -134,11 +134,10 @@ def run(args: Namespace) -> None:
host_address, port = split_listen_address(listen_address)

REGISTRY.register(
UPSMultiExporter(
UPSExporter(
args.config,
insecure=args.insecure,
verbose=args.verbose,
threading=args.threading,
login_timeout=args.login_timeout
)
)
Expand Down
89 changes: 10 additions & 79 deletions prometheus_eaton_ups_exporter/exporter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Create and run a Prometheus Exporter for an Eaton UPS."""
import asyncio
import json

from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures._base import TimeoutError
from prometheus_client.core import GaugeMetricFamily

from prometheus_eaton_ups_exporter import create_logger
from prometheus_eaton_ups_exporter.scraper import UPSScraper

from typing import Generator, Tuple
from typing import Generator

NORMAL_EXECUTION = 0

Expand All @@ -29,28 +28,22 @@ class UPSExporter:
"""
def __init__(
self,
ups_address: str,
authentication: Tuple[str, str],
name: str | None = None,
config: str,
insecure: bool = False,
verbose: bool = False,
login_timeout: int = 3
) -> None:
self.logger = create_logger(
f"{__name__}.{self.__class__.__name__}", not verbose
)
self.ups_scraper = UPSScraper(
ups_address,
authentication,
name,
insecure=insecure,
verbose=verbose,
login_timeout=login_timeout
)
self.insecure = insecure
self.verbose = verbose
self.login_timeout = login_timeout
self.upss = self.get_ups_devices(config)

def collect(self) -> Generator[GaugeMetricFamily, None, None]:
"""Export UPS metrics on request."""
ups_data = self.scrape_data()
ups_data = asyncio.run(self.scrape_data())
for measures in ups_data:
if not measures:
continue
Expand Down Expand Up @@ -192,52 +185,6 @@ def collect(self) -> Generator[GaugeMetricFamily, None, None]:
gauge.add_metric([ups_id], health)
yield gauge

def scrape_data(self):
"""Scrape measure data.

:return: measures
"""
yield self.ups_scraper.get_measures()


class UPSMultiExporter(UPSExporter):
"""Prometheus exporter for multiple UPSs.

Collects metrics from multiple UPSs at the same time. If threading is
enabled, multiple threads will be used to collect sensor readings which is
considerably faster.

:param config: str
Path to the configuration file, containing UPS ip/hostname, username,
and password combinations for all UPSs to be monitored
:param insecure: bool
Whether to connect to UPSs with self-signed SSL certificates
:param threading: bool
Whether to use multiple threads to scrape the data 'parallel'.
This is surely the best way to increase the speed
:param verbose: bool
Allow logging output for development
:param login_timeout: int
Login timeout for authentication
"""

def __init__(
self,
config: str,
insecure: bool = False,
threading: bool = False,
verbose: bool = False,
login_timeout: int = 3
) -> None:
self.logger = create_logger(
f"{__name__}.{self.__class__.__name__}", not verbose
)
self.insecure = insecure
self.threading = threading
self.verbose = verbose
self.login_timeout = login_timeout
self.ups_devices = self.get_ups_devices(config)

@staticmethod
def get_devices(config: str | dict) -> dict:
"""Take a config file path or config dict of UPSs."""
Expand All @@ -260,7 +207,6 @@ def get_ups_devices(self,
List of UPSScrapers
"""
devices = self.get_devices(config)

return [
UPSScraper(
value['address'],
Expand All @@ -273,24 +219,9 @@ def get_ups_devices(self,
for key, value in devices.items()
]

def scrape_data(self):
async def scrape_data(self):
"""Scrape measure data.

:return: measures
"""
if self.threading:
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(ups.get_measures)
for ups in self.ups_devices
]
try:
for future in as_completed(futures, self.login_timeout+1):
yield future.result()
except TimeoutError as err:
self.logger.exception(err)
yield None

else:
for ups in self.ups_devices:
yield ups.get_measures()
return await asyncio.gather(*[ups.get_measures() for ups in self.upss])
45 changes: 24 additions & 21 deletions prometheus_eaton_ups_exporter/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json

from requests import Session, Response
import httpx

from requests.exceptions import (
ConnectionError,
InvalidURL,
Expand Down Expand Up @@ -55,7 +57,7 @@ def __init__(self,
name: str | None = None,
insecure: bool = False,
verbose: bool = False,
login_timeout: int = 3) -> None:
login_timeout: int = 10) -> None:
self.ups_address = ups_address
self.username, self.password = authentication
self.name = name
Expand All @@ -72,7 +74,7 @@ def __init__(self,

self.token_type, self.access_token = None, None

def login(self) -> Tuple[str, str]:
async def login(self) -> Tuple[str, str]:
"""
Login to the UPS Web UI.

Expand All @@ -87,10 +89,11 @@ def login(self) -> Tuple[str, str]:
data["username"] = self.username
data["password"] = self.password

login_request = self.session.post(
login_request = httpx.post(
self.ups_address + LOGIN_AUTH_PATH,
data=json.dumps(data), # needs to be JSON encoded
timeout=self.login_timeout
timeout=self.login_timeout,
verify=False
)
login_response = login_request.json()

Expand Down Expand Up @@ -141,8 +144,8 @@ def login(self) -> Tuple[str, str]:
"Invalid URL, no host supplied"
) from None

def load_page(self,
url: bytes | str) -> Response:
async def load_page(self,
url: bytes | str) -> Response:
"""
Load a webpage of the UPS Web UI or API.

Expand All @@ -159,27 +162,27 @@ def load_page(self,
}

try:
request = self.session.get(
request = httpx.get(
url,
headers=headers,
timeout=REQUEST_TIMEOUT
timeout=REQUEST_TIMEOUT,
verify=False
)

# Session might be expired, connect again
try:
if "errorCode" in request.json() or "code" in request.json():
self.logger.debug('Session expired, reconnect')
self.token_type, self.access_token = self.login()
return self.load_page(url)
self.token_type, self.access_token = await self.login()
return await self.load_page(url)
except ValueError:
pass

# try to login, if not authorized
if "Unauthorized" in request.text:
self.logger.debug('Unauthorized, try to login')
try:
self.token_type, self.access_token = self.login()
return self.load_page(url)
self.token_type, self.access_token = await self.login()
return await self.load_page(url)
except LoginFailedException as err:
if err.error_code == TIMEOUT_ERROR:
raise LoginFailedException(
Expand All @@ -194,14 +197,14 @@ def load_page(self,
except ConnectionError:
self.logger.debug('Connection Error try to login again')
try:
self.token_type, self.access_token = self.login()
return self.load_page(url)
self.token_type, self.access_token = await self.login()
return await self.load_page(url)
except LoginFailedException:
raise
except LoginFailedException:
raise

def get_measures(self) -> dict:
async def get_measures(self) -> dict:
"""
Get most relevant UPS metrics.

Expand All @@ -214,7 +217,7 @@ def get_measures(self) -> dict:
"""
measurements = dict()
try:
power_dist_request = self.load_page(
power_dist_request = await self.load_page(
self.ups_address+REST_API_PATH
)
power_dist_overview = power_dist_request.json()
Expand All @@ -225,23 +228,23 @@ def get_measures(self) -> dict:
ups_inputs_api = power_dist_overview['inputs']['@id']
ups_ouptups_api = power_dist_overview['outputs']['@id']

inputs_request = self.load_page(
inputs_request = await self.load_page(
self.ups_address + ups_inputs_api + f'/{INPUT_MEMBER_ID}'
)
inputs = inputs_request.json()

outputs_request = self.load_page(
outputs_request = await self.load_page(
self.ups_address + ups_ouptups_api + f'/{OUTPUT_MEMBER_ID}'
)
outputs = outputs_request.json()

ups_backup_sys_api = power_dist_overview['backupSystem']['@id']
backup_request = self.load_page(
backup_request = await self.load_page(
self.ups_address + ups_backup_sys_api
)
backup = backup_request.json()
ups_powerbank_api = backup['powerBank']['@id']
powerbank_request = self.load_page(
powerbank_request = await self.load_page(
self.ups_address + ups_powerbank_api
)
powerbank = powerbank_request.json()
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = [
'httpx',
'prometheus_client',
'requests',
]


[project.urls]
"Bug Reports" = "https://github.com/psyinfra/prometheus-eaton-ups-exporter/issues"
"Source" = "https://github.com/psyinfra/prometheus-eaton-ups-exporter/"
Expand All @@ -30,6 +32,7 @@ tests = [
'flake8',
'pyre-check',
'pytest == 7.2.1',
'pytest-asyncio',
'pytest-vcr',
]

Expand Down
39 changes: 0 additions & 39 deletions tests/test_prometheus_api_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,6 @@
Testing the Exporter using the UPSExporter and UPSMultiExporter.
"""
import pytest
from . import first_ups_details
from prometheus_eaton_ups_exporter.exporter import (
UPSExporter,
UPSMultiExporter,
)


# Create Multi Exporter
@pytest.fixture(scope="function")
def single_exporter(ups_scraper_conf) -> UPSExporter:
address, auth, ups_name = first_ups_details(ups_scraper_conf)
return UPSExporter(
address,
auth,
ups_name,
insecure=True,
verbose=True
)


# Create Multi Exporter
@pytest.fixture(scope="function")
def multi_exporter(ups_scraper_conf) -> UPSMultiExporter:
return UPSMultiExporter(
ups_scraper_conf,
insecure=True,
verbose=True
)


# Create Multi Exporter
@pytest.fixture(scope="function")
def threading_multi_exporter(ups_scraper_conf) -> UPSMultiExporter:
return UPSMultiExporter(
ups_scraper_conf,
insecure=True,
verbose=True,
threading=True
)


@pytest.mark.vcr()
Expand Down
Loading
Loading