diff --git a/Nagstamon/config.py b/Nagstamon/config.py index d1f1b969..6fbf6bf7 100644 --- a/Nagstamon/config.py +++ b/Nagstamon/config.py @@ -120,7 +120,9 @@ 'autologin_key', 'custom_cert_ca_file', 'idp_ecp_endpoint', - 'monitor_site' + 'monitor_site', + 'auth_helper_command', + 'auth_helper_extra_args' ] @@ -694,35 +696,24 @@ def save_multiple_config(self, settingsdir, setting): value = '' elif self.keyring_available and self.use_system_keyring: if self.__dict__[settingsdir][s].password != '': - # provoke crash if password saving does not work - this is the case - # on newer Ubuntu releases - try: - keyring.set_password('Nagstamon', - '@'.join((self.__dict__[settingsdir][s].username, - self.__dict__[settingsdir][s].monitor_url)), - self.__dict__[settingsdir][s].password) - except Exception: - import traceback - traceback.print_exc(file=sys.stdout) - sys.exit(1) - value = '' + keyring_account = '@'.join((self.__dict__[settingsdir][s].username, + self.__dict__[settingsdir][s].monitor_url)) + if self._keyring_set('Nagstamon', keyring_account, + self.__dict__[settingsdir][s].password): + value = '' + else: + value = '' if option == 'proxy_password': if self.keyring_available and self.use_system_keyring: if self.__dict__[settingsdir][s].proxy_password != '': - # provoke crash if password saving does not work - this is the case - # on newer Ubuntu releases - try: - keyring.set_password('Nagstamon', - '@'.join(('proxy', - self.__dict__[settingsdir][s].proxy_username, - self.__dict__[settingsdir][s].proxy_address)), - self.__dict__[settingsdir][s].proxy_password) - except Exception: - import traceback - traceback.print_exc(file=sys.stdout) - sys.exit(1) - - value = '' + keyring_account = '@'.join(('proxy', + self.__dict__[settingsdir][s].proxy_username, + self.__dict__[settingsdir][s].proxy_address)) + if self._keyring_set('Nagstamon', keyring_account, + self.__dict__[settingsdir][s].proxy_password): + value = '' + else: + value = '' config.set(setting + '_' + s, option, str(value)) else: config.set(setting + '_' + s, option, str(self.__dict__[settingsdir][s].__dict__[option])) @@ -749,6 +740,40 @@ def save_multiple_config(self, settingsdir, setting): # ## if not f.split(setting + "_")[1].split(".conf")[0] in self.__dict__[settingsdir]: # ## os.unlink(self.configdir + os.sep + settingsdir + os.sep + f) + def _keyring_set(self, service, account, password): + """ + Store a password in the system keyring. + + On macOS, handles errSecDuplicateItem (-25299) by deleting the old + entry and retrying once. + + Crashes the application if keyring storage ultimately fails, to prevent + insecure fallback to obfuscated config file storage (e.g. on newer + Ubuntu releases where keyring backends may be broken). + """ + import keyring + try: + keyring.set_password(service, account, password) + return True + except Exception: + # On macOS, Keychain may return errSecDuplicateItem (-25299). + # Delete the existing entry and retry once. + if OS == OS_MACOS: + try: + keyring.delete_password(service, account) + except Exception: + pass + try: + keyring.set_password(service, account, password) + return True + except Exception: + pass + # provoke crash if password saving does not work, this is the case + # on newer Ubuntu releases + import traceback + traceback.print_exc(file=sys.stdout) + sys.exit(1) + def is_keyring_available(self): """ Determines if the keyring module and a suitable backend are available for secure password storage. @@ -1131,6 +1156,11 @@ def __init__(self): # LibreNMS self.treat_services_as_alerts = False + # Auth helper - external command for authentication (OIDC, cookies, etc.) + self.use_auth_helper = False + self.auth_helper_command = '' + self.auth_helper_extra_args = '' + class Action: """ diff --git a/Nagstamon/qui/dialogs/server.py b/Nagstamon/qui/dialogs/server.py index 8063fff8..f9d00867 100644 --- a/Nagstamon/qui/dialogs/server.py +++ b/Nagstamon/qui/dialogs/server.py @@ -79,7 +79,11 @@ def __init__(self): self.window.input_checkbox_show_options: [self.window.groupbox_options], self.window.input_checkbox_custom_cert_use: [self.window.label_custom_ca_file, self.window.input_lineedit_custom_cert_ca_file, - self.window.button_choose_custom_cert_ca_file]} + self.window.button_choose_custom_cert_ca_file], + self.window.input_checkbox_use_auth_helper: [self.window.label_auth_helper_command, + self.window.input_lineedit_auth_helper_command, + self.window.label_auth_helper_extra_args, + self.window.input_lineedit_auth_helper_extra_args]} self.TOGGLE_DEPS_INVERTED = [self.window.input_checkbox_use_proxy_from_os] @@ -185,6 +189,9 @@ def __init__(self): # when authentication is changed to Kerberos then disable username/password as they are now useless self.window.input_combobox_authentication.activated.connect(self.toggle_authentication) + # when auth helper is toggled, hide/show username/password accordingly + self.window.input_checkbox_use_auth_helper.toggled.connect(self.toggle_authentication) + # reset Checkmk views self.window.button_checkmk_view_hosts_reset.clicked.connect(self.checkmk_view_hosts_reset) self.window.button_checkmk_view_services_reset.clicked.connect(self.checkmk_view_services_reset) @@ -243,6 +250,16 @@ def toggle_authentication(self): widget.show() self.window.button_delete_web_cookies.hide() + # no need for username + password or authentication type when using auth helper + if self.window.input_checkbox_use_auth_helper.isChecked(): + for widget in self.AUTHENTICATION_WIDGETS: + widget.hide() + self.window.label_auth_type.hide() + self.window.input_combobox_authentication.hide() + else: + self.window.label_auth_type.show() + self.window.input_combobox_authentication.show() + # after hiding authentication widgets dialog might shrink self.window.adjustSize() diff --git a/Nagstamon/qui/widgets/statuswindow.py b/Nagstamon/qui/widgets/statuswindow.py index 82093871..3faef5ad 100644 --- a/Nagstamon/qui/widgets/statuswindow.py +++ b/Nagstamon/qui/widgets/statuswindow.py @@ -595,6 +595,7 @@ def create_server_vbox(self, name): # display authentication dialog if password is not known if not conf.servers[server.name].save_password and \ not conf.servers[server.name].use_autologin and \ + not getattr(conf.servers[server.name], 'use_auth_helper', False) and \ conf.servers[server.name].password == '' and \ not conf.servers[server.name].authentication == 'kerberos' and \ not conf.servers[server.name].authentication == 'web': diff --git a/Nagstamon/resources/qui/settings_server.ui b/Nagstamon/resources/qui/settings_server.ui index 75858da2..e449942c 100644 --- a/Nagstamon/resources/qui/settings_server.ui +++ b/Nagstamon/resources/qui/settings_server.ui @@ -788,6 +788,53 @@ + + + + Use auth helper for authentication + + + + + + + + 0 + 0 + + + + Helper command: + + + + + + + e.g., uvx nagstamon-auth-helper + + + + + + + + 0 + 0 + + + + Extra arguments: + + + + + + + e.g., --client-id nagstamon --redirect-port 12345 + + + @@ -875,6 +922,9 @@ button_checkmk_view_services_reset input_lineedit_idp_ecp_endpoint input_lineedit_disabled_backends + input_checkbox_use_auth_helper + input_lineedit_auth_helper_command + input_lineedit_auth_helper_extra_args diff --git a/Nagstamon/servers/Generic.py b/Nagstamon/servers/Generic.py index 1e5f41fb..d4b19cad 100644 --- a/Nagstamon/servers/Generic.py +++ b/Nagstamon/servers/Generic.py @@ -22,7 +22,9 @@ import json from pathlib import Path import platform +import shlex import socket +import subprocess import sys import traceback import urllib.parse @@ -357,6 +359,10 @@ def create_session(self): cookies = load_cookies() session.cookies = cookie_data_to_jar(self.name, cookies) + # Auth helper provides its own credentials, skip built-in auth + if getattr(self, 'use_auth_helper', False) and getattr(self, 'auth_helper_command', ''): + session.auth = None + # default to check TLS validity if self.ignore_cert: session.verify = False @@ -413,6 +419,124 @@ def reset_http(self): if self.authentication != 'web': self.session = None + def _run_auth_helper(self, args): + """ + Run the auth helper command with given arguments. + Returns (stdout, stderr, returncode) tuple. + """ + cmd = shlex.split(self.auth_helper_command) + args + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=180) + if conf.debug_mode and result.stderr: + self.debug(server=self.get_name(), debug=f'[Auth helper stderr] {result.stderr.strip()}') + return result.stdout, result.stderr, result.returncode + except FileNotFoundError: + return '', f'Auth helper command not found: {self.auth_helper_command}', -1 + except subprocess.TimeoutExpired: + return '', 'Auth helper command timed out', -1 + except Exception as e: + return '', f'Auth helper error: {e}', -1 + + def _get_auth_helper_credentials(self): + """ + Call the external auth helper to get HTTP credentials (headers and/or cookies). + Returns a dict with 'headers' and optionally 'cookies' on success, + or None on failure (sets status_description). + Automatically triggers re-authentication if the helper signals exit code 1. + """ + stdout, stderr, rc = self._run_auth_helper([ + 'get-headers', '--server-name', self.name + ]) + + if rc == 0: + try: + data = json.loads(stdout) + return self._normalize_auth_helper_response(data) + except (json.JSONDecodeError, ValueError) as e: + self.status_description = f'Auth helper returned invalid JSON: {e}' + return None + + # Exit code 1 means re-authentication required, run 'authenticate' automatically + if rc == 1: + if conf.debug_mode: + self.debug(server=self.get_name(), + debug='[Auth helper] get-headers requires re-authentication, launching authenticate...') + + auth_args = [ + 'authenticate', + '--server-name', self.name, + '--monitor-url', self.monitor_url, + ] + # Append user-configured extra arguments (e.g. --client-id, --redirect-port) + extra = getattr(self, 'auth_helper_extra_args', '') + if extra: + auth_args.extend(shlex.split(extra)) + if getattr(self, 'ignore_cert', False): + auth_args.append('--insecure') + + auth_stdout, auth_stderr, auth_rc = self._run_auth_helper(auth_args) + + if auth_rc == 0: + # Retry get-headers after successful authentication + stdout, stderr, rc = self._run_auth_helper([ + 'get-headers', '--server-name', self.name + ]) + if rc == 0: + try: + data = json.loads(stdout) + return self._normalize_auth_helper_response(data) + except (json.JSONDecodeError, ValueError) as e: + self.status_description = f'Auth helper returned invalid JSON: {e}' + return None + + # Authentication failed or retry failed + error_detail = '' + try: + error_data = json.loads(auth_stdout or stdout) + error_detail = error_data.get('error', '') + except (json.JSONDecodeError, ValueError): + error_detail = auth_stderr or stderr + self.status_description = f'Auth helper authentication failed: {error_detail}' + return None + + # Any other exit code + error_detail = '' + try: + error_data = json.loads(stdout) + error_detail = error_data.get('error', stdout) + except (json.JSONDecodeError, ValueError): + error_detail = stderr or stdout + self.status_description = f'Auth helper error (exit {rc}): {error_detail}' + return None + + @staticmethod + def _normalize_auth_helper_response(data): + """ + Normalize the auth helper JSON response into {'headers': {...}, 'cookies': {...}}. + Accepts either the structured format or a flat dict (treated as headers-only). + """ + if 'headers' in data: + return {'headers': data['headers'], 'cookies': data.get('cookies', {})} + # Flat dict, treat entire response as headers + return {'headers': data, 'cookies': {}} + + def _apply_auth_helper_credentials(self): + """ + Apply auth helper credentials (headers and cookies) to the session. + Returns None on success, or a Result on failure (caller should return it). + """ + credentials = self._get_auth_helper_credentials() + if credentials is None: + return Result(result='ERROR', + error=self.status_description, + status_code=-1) + if self.session: + if credentials.get('headers'): + self.session.headers.update(credentials['headers']) + if credentials.get('cookies'): + self.session.cookies.update(credentials['cookies']) + return None + def get_name(self): """ return stringified name @@ -950,6 +1074,13 @@ def get_status(self, output=None): # initialize HTTP first self.init_http() + # apply auth helper credentials if configured + if getattr(self, 'use_auth_helper', False) and getattr(self, 'auth_helper_command', ''): + auth_result = self._apply_auth_helper_credentials() + if auth_result is not None: + self.isChecking = False + return auth_result + # get all trouble hosts/services from server specific _get_status() status = self._get_status() @@ -986,6 +1117,12 @@ def get_status(self, output=None): 'login failed' in self.status_description.lower() or \ self.status_code in self.STATUS_CODES_NO_AUTH: if conf.servers[self.name].enabled is True: + # Auth helper handles its own re-authentication, don't prompt for credentials + if getattr(self, 'use_auth_helper', False) and getattr(self, 'auth_helper_command', ''): + self.isChecking = False + return Result(result='ERROR', + error=self.status_description, + status_code=self.status_code) # needed to get valid credentials self.refresh_authentication = True # clean existent authentication diff --git a/Nagstamon/servers/IcingaDBWeb.py b/Nagstamon/servers/IcingaDBWeb.py index e86a50a5..c88422d0 100644 --- a/Nagstamon/servers/IcingaDBWeb.py +++ b/Nagstamon/servers/IcingaDBWeb.py @@ -101,6 +101,10 @@ def init_http(self): """ GenericServer.init_http(self) + # If session was not created (e.g. refresh_authentication is pending), bail out + if not self.session: + return + # Run cookie cleanup only once per application run / server instance (https://github.com/HenriWahl/Nagstamon/issues/1163) if not getattr(self, '_cookie_cleanup_done', False): self._cookie_cleanup_done = True diff --git a/Nagstamon/servers/__init__.py b/Nagstamon/servers/__init__.py index 7f028a57..1075a934 100644 --- a/Nagstamon/servers/__init__.py +++ b/Nagstamon/servers/__init__.py @@ -173,8 +173,10 @@ def create_server(server=None): new_server.idp_ecp_endpoint = server.idp_ecp_endpoint # if password is not to be saved ask for it at startup + # skip if auth helper is used, it provides its own authentication if (server.enabled is True and server.save_password is False and - server.use_autologin is False): + server.use_autologin is False and + server.use_auth_helper is False): new_server.refresh_authentication = True # Special FX @@ -229,6 +231,11 @@ def create_server(server=None): # Thruk new_server.disabled_backends = server.disabled_backends + # Auth helper + new_server.use_auth_helper = server.use_auth_helper + new_server.auth_helper_command = server.auth_helper_command + new_server.auth_helper_extra_args = server.auth_helper_extra_args + # server's individual preparations for HTTP connections (for example cookie creation) # is done in GetStatus() method of monitor if server.enabled is True: diff --git a/docs/Auth-Helper-Interface.md b/docs/Auth-Helper-Interface.md new file mode 100644 index 00000000..87785ba9 --- /dev/null +++ b/docs/Auth-Helper-Interface.md @@ -0,0 +1,255 @@ +# Auth Helper Interface Specification + +Nagstamon supports external auth helpers, standalone commands that handle +authentication independently of Nagstamon internals. This includes OIDC/OAuth2, +SAML, session cookies, API tokens, or any other scheme. This document defines +the CLI contract so anyone can build a compatible helper. + +An example implementation is included in +[docs/example-auth-helper/nagstamon-auth-helper](example-auth-helper/nagstamon-auth-helper), +implementing OIDC Authorization Code + PKCE via the system browser. + +## Overview + +The helper is a command-line tool that Nagstamon invokes via subprocess. It must support +two subcommands: `get-headers` (non-interactive) and `authenticate` (interactive, +e.g. opens a browser). + +Nagstamon calls `get-headers` before each polling cycle. If that fails with exit code 1, +Nagstamon automatically calls `authenticate` to trigger interactive login, then retries +`get-headers`. + +## Compatible Server Types + +The auth helper works by injecting HTTP headers and/or cookies into Nagstamon's +shared HTTP session. This is compatible with any server type that uses +`fetch_url()` for its HTTP communication. Server types that use their own +transport or API-level authentication will not benefit from the injected +credentials, unless an authenticating reverse proxy (e.g. OAuth2 Proxy, +Keycloak Gatekeeper) sits in front of the monitor. + +### Fully compatible + +These server types use Nagstamon's HTTP session for all requests. The helper +credentials are applied automatically: + +- IcingaDBWeb +- IcingaDBWebNotifications +- IcingaWeb2 +- Icinga +- Icinga2API +- Nagios +- Thruk +- op5Monitor +- Checkmk Multisite +- Prometheus +- Alertmanager +- Monitos3 +- monitos4x +- SNAG-View 3 + +### Compatible with caveats + +These server types use Nagstamon's HTTP session **but also perform their own +API-level authentication**. The helper credentials are sent alongside API +tokens, which works when the auth layer is handled by a reverse proxy in front +of the monitor: + +- Zabbix, authenticates via JSON-RPC `auth` field; helper headers are sent + but Zabbix itself ignores them unless a proxy validates them +- Opsview, authenticates via `X-Opsview-Token`; same caveat +- Centreon, authenticates via `X-Auth-Token`; same caveat +- LibreNMS, authenticates via `X-Auth-Token`; same caveat +- Sensu, uses a separate `SensuAPI` client; helper headers land on the session + but Sensu API calls go through their own transport + +### Not compatible + +These server types bypass HTTP entirely or use independent transport: + +- Livestatus, communicates via raw TCP sockets, not HTTP +- SensuGo, uses its own `SensuGoAPI` HTTP client, not `fetch_url()` +- Zenoss, uses its own `ZenossAPI` client, not `fetch_url()` + + +## Configuration + +Users configure these fields per server in Nagstamon settings: + +| Field | Description | Example | +|---|---|---| +| Helper command | The command to invoke | `nagstamon-oidc-helper` | +| Extra arguments | Appended to the `authenticate` invocation | `--client-id nagstamon --redirect-port 12345` | + +The helper command is split by shell rules (supports paths with spaces if quoted). +Extra arguments are also split by shell rules and appended to the `authenticate` +subcommand. Use this to pass helper-specific options like OIDC client IDs, +redirect ports, or any other parameters your helper needs. + + +## Subcommand: `get-headers` + +**Purpose:** Return HTTP headers and/or cookies as JSON. Called non-interactively +on every polling cycle. + +### Invocation + +``` + get-headers --server-name +``` + +| Argument | Required | Description | +|---|---|---| +| `--server-name` | Yes | Unique server name (as configured in Nagstamon) | + +### Success (exit code 0) + +Print a JSON object to **stdout**. Two formats are supported: + +**Structured format (recommended)**, return headers and/or cookies: + +```json +{ + "headers": {"Authorization": "Bearer eyJhbGciOi..."}, + "cookies": {"KEYCLOAK_SESSION": "abc123", "Icingaweb2": "xyz789"} +} +``` + +**Flat format**, return headers only (treated as a headers-only response): + +```json +{"Authorization": "Bearer eyJhbGciOi...", "X-OAuth2": "1"} +``` + +Requirements: +- Must be a valid JSON object +- In structured format: `headers` is required, `cookies` is optional +- In flat format: the entire object is treated as HTTP headers +- Nagstamon merges headers into `session.headers` and cookies into + `session.cookies` + +### Re-authentication needed (exit code 1) + +Signals that tokens are missing, expired, or refresh failed. Nagstamon will +automatically call the `authenticate` subcommand. + +Stdout may optionally contain: +```json +{"status": "error", "error": "No tokens found, run 'authenticate' first"} +``` + +### Other errors (exit code 2+) + +Any other exit code is treated as a configuration or infrastructure error. +Nagstamon will display the error and retry on the next cycle. + + +## Subcommand: `authenticate` + +**Purpose:** Perform interactive authentication (e.g. open a browser for OIDC login). +Called automatically when `get-headers` returns exit code 1. + +### Invocation + +``` + authenticate --server-name --monitor-url [...] [--insecure] +``` + +Nagstamon always passes `--server-name` and `--monitor-url`. Any extra arguments +configured in the server settings are appended. `--insecure` is added when +"Ignore SSL/TLS certificate" is checked. + +| Argument | Source | Description | +|---|---|---| +| `--server-name` | Always | Unique server name (for token storage) | +| `--monitor-url` | Always | Monitor server base URL | +| `` | Server config | User-configured extra arguments (e.g. `--client-id`, `--redirect-port`) | +| `--insecure` | Auto | Present when TLS verification is disabled | + +### Success (exit code 0) + +Print a JSON status to **stdout**: + +```json +{"status": "ok", "expires_at": 1773658714} +``` + +| Field | Type | Description | +|---|---|---| +| `status` | string | `"ok"` on success | +| `expires_at` | number | Unix timestamp when the token expires (informational) | + +After success, Nagstamon retries `get-headers` which should now return valid +credentials. + +### Failure (exit code 1) + +Authentication was cancelled or failed: + +```json +{"status": "error", "error": "Timeout waiting for browser login"} +``` + +### Configuration error (exit code 2) + +Discovery or setup failed: + +```json +{"status": "error", "error": "Could not discover OIDC provider"} +``` + + +## Verbose / Debug Output + +The helper should write debug/verbose output to **stderr** only. Nagstamon captures +stdout for JSON parsing. When Nagstamon is in debug mode, stderr output from the +helper is logged. + + +## Timeout + +Nagstamon sets a subprocess timeout of **180 seconds** for both subcommands. The +`authenticate` command should implement its own timeout for the interactive flow, +typically 120 seconds, and exit with code 1 if it expires. + + +## Token / Credential Storage + +Credential persistence is the helper's responsibility. Nagstamon does not store +or manage tokens, it only consumes the headers and cookies returned by +`get-headers`. + +A recommended location is `~/.config/nagstamon-auth/.json` with +`0600` file permissions. + + +## Lifecycle Summary + +``` +Nagstamon polling cycle + │ + ├─ Call: get-headers --server-name "my-server" + │ + ├─ Exit 0 ──► Parse JSON ──► Apply headers + cookies to session ──► Poll monitor + │ + └─ Exit 1 ──► Call: authenticate --server-name "my-server" \ + │ --monitor-url "https://..." + │ + │ ├─ Exit 0 ──► Retry get-headers ──► Continue polling + │ └─ Exit 1/2 ──► Show error ──► Retry next cycle + │ + └─ Exit 2+ ──► Show error ──► Retry next cycle +``` + + +## Example Implementation + +A complete, self-contained OIDC auth helper is included in +[example-auth-helper/nagstamon-auth-helper](example-auth-helper/nagstamon-auth-helper). +It implements Authorization Code + PKCE via the system browser and can be used +as-is or as a starting point for custom helpers. + +Nagstamon configuration: +- **Helper command:** `python3 /path/to/nagstamon-auth-helper` +- **Extra arguments:** `--client-id nagstamon --redirect-port 12345` + diff --git a/docs/example-auth-helper/nagstamon-auth-helper b/docs/example-auth-helper/nagstamon-auth-helper new file mode 100755 index 00000000..cd90768c --- /dev/null +++ b/docs/example-auth-helper/nagstamon-auth-helper @@ -0,0 +1,599 @@ +#!/usr/bin/env python3 +""" +Example auth helper for Nagstamon, OIDC Authorization Code + PKCE. + +This is a self-contained, single-file helper that implements the Nagstamon +Auth Helper CLI contract (see docs/Auth-Helper-Interface.md). It performs +OIDC login via the system browser and returns Bearer tokens as HTTP headers. + +Usage: + # Interactive login (opens browser) + nagstamon-auth-helper authenticate --server-name myserver \ + --monitor-url https://icinga.example.com \ + --client-id nagstamon --redirect-port 12345 + + # Non-interactive, returns JSON headers (auto-refreshes tokens) + nagstamon-auth-helper get-headers --server-name myserver + +Requirements: + pip install requests + +Token storage: + ~/.config/nagstamon-auth/.json (0600 permissions) + +This file is meant as a starting point. Adapt it to your environment: +you may need to change the OIDC discovery logic, scopes, or redirect +handling to match your identity provider. +""" + +from __future__ import annotations + +import argparse +import base64 +import hashlib +import html +import json +import os +import secrets +import stat +import sys +import threading +import time +import webbrowser +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any +from urllib.parse import parse_qs, urlencode, urljoin, urlparse + +import requests + +USER_AGENT = "NagstamonAuthHelper/1.0" + + +# --------------------------------------------------------------------------- +# Token storage +# --------------------------------------------------------------------------- + +def _store_dir() -> str: + """Return the token storage directory, respecting XDG_CONFIG_HOME.""" + xdg = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")) + return os.path.join(xdg, "nagstamon-auth") + + +def _safe_name(server_name: str) -> str: + """Sanitize server name for use as a filename.""" + return "".join(c if c.isalnum() or c in "-_." else "_" for c in server_name) + + +def _token_path(server_name: str) -> str: + return os.path.join(_store_dir(), f"{_safe_name(server_name)}.json") + + +def save_tokens(server_name: str, token_data: dict[str, Any]) -> None: + """Save token data to disk with restrictive permissions (0600).""" + path = _token_path(server_name) + os.makedirs(os.path.dirname(path), exist_ok=True) + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IRUSR | stat.S_IWUSR) + with os.fdopen(fd, "w") as f: + json.dump(token_data, f, indent=2) + + +def load_tokens(server_name: str) -> dict[str, Any] | None: + """Load token data from disk, or None if not found.""" + path = _token_path(server_name) + if not os.path.exists(path): + return None + with open(path) as f: + return json.load(f) + + +# --------------------------------------------------------------------------- +# OIDC discovery +# --------------------------------------------------------------------------- + +class DiscoveryError(Exception): + pass + + +def discover_oidc( + monitor_url: str, + session: requests.Session | None = None, + timeout: int = 15, + verbose: bool = False, +) -> dict[str, Any]: + """ + Auto-discover OIDC issuer and endpoints from the monitor's login redirect. + + Follows HTTP redirects from ``monitor_url`` until it finds a URL with a + ``client_id`` query parameter (the OIDC authorization endpoint). Then + derives the issuer URL and fetches .well-known/openid-configuration. + + If your identity provider does not redirect from the monitor URL, you can + replace this function with a simpler implementation that returns a dict + with ``authorization_endpoint`` and ``token_endpoint`` directly. + """ + if session is None: + session = requests.Session() + session.headers.setdefault("User-Agent", USER_AGENT) + + login_url = monitor_url.rstrip("/") + if verbose: + print(f"[discovery] fetching {login_url}", file=sys.stderr) + + try: + response = session.get(login_url, allow_redirects=False, timeout=timeout) + except requests.RequestException as e: + raise DiscoveryError(f"Failed to connect to {login_url}: {e}") from e + + redirect_url = None + current_response = response + current_url = login_url + + for i in range(10): + if current_response.status_code not in (301, 302, 303, 307, 308): + break + location = current_response.headers.get("Location", "") + if not location: + break + if not location.startswith(("http://", "https://")): + location = urljoin(current_url, location) + redirect_url = location + + if verbose: + print(f"[discovery] redirect #{i + 1} -> {redirect_url}", file=sys.stderr) + + parsed = urlparse(redirect_url) + params = parse_qs(parsed.query) + if "client_id" in params: + if verbose: + print(f"[discovery] found OIDC endpoint", file=sys.stderr) + break + + try: + current_url = redirect_url + current_response = session.get(redirect_url, allow_redirects=False, timeout=timeout) + except requests.RequestException: + break + + if not redirect_url: + raise DiscoveryError( + f"No OIDC redirect found at {login_url}. " + "Make sure the monitor server redirects to your OIDC provider." + ) + + # Derive issuer URL from the redirect + parsed = urlparse(redirect_url) + issuer_path = parsed.path + for marker in ("/protocol/openid-connect", "/.well-known"): + idx = parsed.path.find(marker) + if idx != -1: + issuer_path = parsed.path[:idx] + break + issuer_url = f"{parsed.scheme}://{parsed.netloc}{issuer_path}" + + if verbose: + print(f"[discovery] issuer URL: {issuer_url}", file=sys.stderr) + + well_known_url = f"{issuer_url}/.well-known/openid-configuration" + if verbose: + print(f"[discovery] fetching {well_known_url}", file=sys.stderr) + + try: + wk_response = session.get(well_known_url, timeout=timeout) + wk_response.raise_for_status() + oidc_config = wk_response.json() + except (requests.RequestException, ValueError) as e: + raise DiscoveryError(f"Failed to fetch OIDC config from {well_known_url}: {e}") from e + + for key in ("authorization_endpoint", "token_endpoint"): + if key not in oidc_config: + raise DiscoveryError(f"OIDC configuration missing required key: {key}") + + return { + "issuer": oidc_config.get("issuer", issuer_url), + "authorization_endpoint": oidc_config["authorization_endpoint"], + "token_endpoint": oidc_config["token_endpoint"], + "end_session_endpoint": oidc_config.get("end_session_endpoint", ""), + } + + +# --------------------------------------------------------------------------- +# OIDC Authorization Code + PKCE flow +# --------------------------------------------------------------------------- + +class AuthError(Exception): + pass + + +class TokenError(AuthError): + pass + + +class TokenResult: + def __init__( + self, + access_token: str, + refresh_token: str | None = None, + id_token: str | None = None, + expires_at: float | None = None, + ) -> None: + self.access_token = access_token + self.refresh_token = refresh_token + self.id_token = id_token + self.expires_at = expires_at + + +def _generate_pkce() -> tuple[str, str]: + """Generate PKCE code_verifier and code_challenge (S256) per RFC 7636.""" + code_verifier = secrets.token_urlsafe(64) + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + return code_verifier, code_challenge + + +class _CallbackHandler(BaseHTTPRequestHandler): + """HTTP handler that captures the OIDC authorization code callback.""" + + server: _CallbackServer # type: ignore[assignment] + + def do_GET(self) -> None: + parsed = urlparse(self.path) + params = parse_qs(parsed.query) + + if parsed.path != "/callback": + self.send_error(404) + return + + if "error" in params: + error = html.escape(params["error"][0]) + desc = html.escape(params.get("error_description", [""])[0]) + self.server.auth_error = f"{error}: {desc}" + self.server.auth_code = None + self._send_html("Authentication failed. You can close this tab.", success=False) + elif "code" in params: + self.server.auth_code = params["code"][0] + self.server.auth_state = params.get("state", [None])[0] + self.server.auth_error = None + self._send_html("Authentication successful! You can close this tab.", success=True) + else: + self.server.auth_error = "No authorization code received" + self.server.auth_code = None + self._send_html("Authentication failed. You can close this tab.", success=False) + + self.server.auth_event.set() + + def _send_html(self, message: str, success: bool) -> None: + icon = "✔" if success else "✘" + color = "#2e7d32" if success else "#c62828" + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + body = ( + "" + "Nagstamon Auth" + '' + "" + f'
{icon}
' + f"

{html.escape(message)}

" + "

This tab will close automatically...

" + "
" + "" + "" + ) + self.wfile.write(body.encode("utf-8")) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 + pass # suppress default HTTP log noise + + +class _CallbackServer(HTTPServer): + """HTTPServer with SO_REUSEADDR for immediate port reuse.""" + + allow_reuse_address = True + allow_reuse_port = True + + auth_code: str | None + auth_state: str | None + auth_error: str | None + auth_event: threading.Event + + +def oidc_authenticate( + authorization_endpoint: str, + token_endpoint: str, + client_id: str = "nagstamon", + redirect_port: int = 12345, + scope: str = "openid", + timeout: int = 120, + verbose: bool = False, +) -> TokenResult: + """ + Run the OIDC Authorization Code + PKCE flow. + + Opens the system browser for user login, waits for the callback, + and exchanges the authorization code for tokens. + """ + code_verifier, code_challenge = _generate_pkce() + state = secrets.token_urlsafe(32) + redirect_uri = f"http://localhost:{redirect_port}/callback" + + auth_params = { + "client_id": client_id, + "response_type": "code", + "redirect_uri": redirect_uri, + "scope": scope, + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + } + auth_url = f"{authorization_endpoint}?{urlencode(auth_params)}" + + if verbose: + print(f"[auth] starting callback server on port {redirect_port}", file=sys.stderr) + + try: + server = _CallbackServer(("127.0.0.1", redirect_port), _CallbackHandler) + except OSError as e: + raise AuthError(f"Port {redirect_port} already in use") from e + + server.auth_code = None + server.auth_state = None + server.auth_error = None + server.auth_event = threading.Event() + + server_thread = threading.Thread( + target=lambda: server.serve_forever(poll_interval=0.5), daemon=True + ) + server_thread.start() + + if verbose: + print("[auth] opening system browser for login", file=sys.stderr) + webbrowser.open(auth_url) + + if not server.auth_event.wait(timeout=timeout): + server.shutdown() + server.server_close() + server_thread.join(timeout=5) + raise AuthError(f"Authentication timed out after {timeout} seconds") + + server.shutdown() + server.server_close() + server_thread.join(timeout=5) + + if server.auth_error: + raise AuthError(f"Authentication error: {server.auth_error}") + if not server.auth_code: + raise AuthError("No authorization code received") + if server.auth_state != state: + raise AuthError("State mismatch, possible CSRF attack") + + if verbose: + print("[auth] received authorization code, exchanging for tokens...", file=sys.stderr) + + return _exchange_code( + server.auth_code, redirect_uri, code_verifier, + token_endpoint, client_id, verbose=verbose, + ) + + +def _exchange_code( + code: str, redirect_uri: str, code_verifier: str, + token_endpoint: str, client_id: str, verbose: bool = False, +) -> TokenResult: + """Exchange authorization code for tokens at the token endpoint.""" + session = requests.Session() + session.headers["User-Agent"] = USER_AGENT + try: + response = session.post(token_endpoint, data={ + "grant_type": "authorization_code", + "client_id": client_id, + "code": code, + "redirect_uri": redirect_uri, + "code_verifier": code_verifier, + }, timeout=15) + if verbose: + print(f"[auth] token exchange response: {response.status_code}", file=sys.stderr) + response.raise_for_status() + token_json = response.json() + except (requests.RequestException, ValueError) as e: + raise TokenError(f"Token exchange failed: {e}") from e + + access_token = token_json.get("access_token") + if not access_token: + raise TokenError("No access_token in token response") + + expires_in = token_json.get("expires_in", 300) + if verbose: + print(f"[auth] token exchange OK, expires_in={expires_in}s", file=sys.stderr) + + return TokenResult( + access_token=access_token, + refresh_token=token_json.get("refresh_token"), + id_token=token_json.get("id_token"), + expires_at=time.time() + int(expires_in), + ) + + +def refresh_token( + refresh_tok: str, token_endpoint: str, client_id: str = "nagstamon", + verbose: bool = False, +) -> TokenResult: + """Use a refresh_token to obtain a new access_token.""" + session = requests.Session() + session.headers["User-Agent"] = USER_AGENT + try: + response = session.post(token_endpoint, data={ + "grant_type": "refresh_token", + "client_id": client_id, + "refresh_token": refresh_tok, + }, timeout=15) + if verbose: + print(f"[auth] refresh response: {response.status_code}", file=sys.stderr) + response.raise_for_status() + token_json = response.json() + except (requests.RequestException, ValueError) as e: + raise TokenError(f"Token refresh failed: {e}") from e + + access_token = token_json.get("access_token") + if not access_token: + raise TokenError("No access_token in refresh response") + + expires_in = token_json.get("expires_in", 300) + if verbose: + print(f"[auth] refresh OK, expires_in={expires_in}s", file=sys.stderr) + + return TokenResult( + access_token=access_token, + refresh_token=token_json.get("refresh_token", refresh_tok), + id_token=token_json.get("id_token"), + expires_at=time.time() + int(expires_in), + ) + + +# --------------------------------------------------------------------------- +# CLI commands +# --------------------------------------------------------------------------- + +def cmd_authenticate(args: argparse.Namespace) -> None: + """Discover OIDC endpoints, open browser, exchange code, persist tokens.""" + verbose = args.verbose + + try: + if verbose: + print(f"[cli] discovering OIDC for {args.monitor_url}", file=sys.stderr) + + session = None + if args.insecure: + session = requests.Session() + session.verify = False + + oidc_config = discover_oidc( + args.monitor_url, session=session, verbose=verbose, + ) + + if verbose: + print(f"[cli] auth_endpoint={oidc_config['authorization_endpoint']}", file=sys.stderr) + print(f"[cli] token_endpoint={oidc_config['token_endpoint']}", file=sys.stderr) + + result = oidc_authenticate( + authorization_endpoint=oidc_config["authorization_endpoint"], + token_endpoint=oidc_config["token_endpoint"], + client_id=args.client_id, + redirect_port=args.redirect_port, + timeout=args.timeout, + verbose=verbose, + ) + + token_data = { + "access_token": result.access_token, + "refresh_token": result.refresh_token or "", + "id_token": result.id_token or "", + "expires_at": result.expires_at, + "token_endpoint": oidc_config["token_endpoint"], + "authorization_endpoint": oidc_config["authorization_endpoint"], + "client_id": args.client_id, + "monitor_url": args.monitor_url, + } + save_tokens(args.server_name, token_data) + + output: dict[str, Any] = {"status": "ok", "expires_at": result.expires_at} + print(json.dumps(output)) + + if verbose: + remaining = (result.expires_at or 0) - time.time() + print(f"[cli] authenticated, token expires in {remaining:.0f}s", file=sys.stderr) + + except DiscoveryError as e: + print(json.dumps({"status": "error", "error": str(e)})) + sys.exit(2) + except AuthError as e: + print(json.dumps({"status": "error", "error": str(e)})) + sys.exit(1) + + +def cmd_get_headers(args: argparse.Namespace) -> None: + """Load persisted tokens, auto-refresh if needed, print headers as JSON.""" + verbose = args.verbose + + token_data = load_tokens(args.server_name) + if not token_data or not token_data.get("access_token"): + print(json.dumps({"status": "error", "error": "No tokens found, run 'authenticate' first"})) + sys.exit(1) + + # Auto-refresh if token expires within 60 seconds + expires_at = token_data.get("expires_at", 0) + if time.time() > expires_at - 60: + refresh_tok = token_data.get("refresh_token") + token_endpoint = token_data.get("token_endpoint") + client_id = token_data.get("client_id", "nagstamon") + + if not refresh_tok or not token_endpoint: + print(json.dumps({ + "status": "error", + "error": "Token expired and no refresh_token, run 'authenticate' again", + })) + sys.exit(1) + + if verbose: + print("[cli] token expired/expiring, refreshing...", file=sys.stderr) + + try: + result = refresh_token(refresh_tok, token_endpoint, client_id, verbose=verbose) + token_data["access_token"] = result.access_token + token_data["refresh_token"] = result.refresh_token or refresh_tok + token_data["expires_at"] = result.expires_at + save_tokens(args.server_name, token_data) + + if verbose: + remaining = (result.expires_at or 0) - time.time() + print(f"[cli] refreshed, new expiry in {remaining:.0f}s", file=sys.stderr) + except TokenError as e: + print(json.dumps({"status": "error", "error": f"Refresh failed: {e}"})) + sys.exit(1) + + # Return structured response with headers (and cookies if needed) + print(json.dumps({ + "headers": { + "Authorization": f"Bearer {token_data['access_token']}", + }, + })) + + +def main() -> None: + parser = argparse.ArgumentParser( + prog="nagstamon-auth-helper", + description="Example OIDC auth helper for Nagstamon", + ) + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output to stderr") + subparsers = parser.add_subparsers(dest="command", required=True) + + # authenticate + p_auth = subparsers.add_parser("authenticate", help="Interactive OIDC login via browser") + p_auth.add_argument("--server-name", required=True, help="Unique server name (for token storage)") + p_auth.add_argument("--monitor-url", required=True, help="Monitor server base URL") + p_auth.add_argument("--client-id", default="nagstamon", help="OIDC client ID (default: nagstamon)") + p_auth.add_argument("--redirect-port", type=int, default=12345, help="Local callback port (default: 12345)") + p_auth.add_argument("--timeout", type=int, default=120, help="Browser auth timeout in seconds (default: 120)") + p_auth.add_argument("--insecure", action="store_true", help="Skip TLS certificate verification") + + # get-headers + p_headers = subparsers.add_parser("get-headers", help="Get HTTP auth headers (auto-refreshes)") + p_headers.add_argument("--server-name", required=True, help="Server name (must match authenticate)") + + args = parser.parse_args() + if args.command == "authenticate": + cmd_authenticate(args) + elif args.command == "get-headers": + cmd_get_headers(args) + + +if __name__ == "__main__": + main()