diff --git a/CMakeLists.txt b/CMakeLists.txt index d9a9191..6ac3612 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,8 @@ set(THERMAL_SIMD_DISPATCHER_CPU_FLAGS "-msse4.1" CACHE STRING "CPU-specific comp include(CTest) +find_package(OpenSSL REQUIRED) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/include/thermal/simd) configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/include/thermal/simd/version.h.in @@ -32,6 +34,8 @@ set(THERMAL_SIMD_CORE_SOURCES src/telemetry/fusion.cpp src/telemetry/fusion_bridge.cpp src/observability/metrics.cpp + src/observability/statsd_exporter.cpp + src/observability/telemetry_state.cpp src/thermal_config.c src/thermal_cpu.c src/patcher/trampoline.cpp @@ -57,6 +61,8 @@ target_compile_definitions(thermal_simd_core TSD_DEFAULT_COEFF_PATH="${CMAKE_CURRENT_SOURCE_DIR}/config/controller_coeffs.json" ) +target_link_libraries(thermal_simd_core PRIVATE pthread OpenSSL::SSL OpenSSL::Crypto) + add_executable(thermal_simd src/main.cpp src/thermal_simd.c) target_compile_options(thermal_simd PRIVATE -O2 -pthread -fPIC -mno-avx ${THERMAL_SIMD_DISPATCHER_CPU_FLAGS}) target_link_libraries(thermal_simd PRIVATE thermal_simd_core pthread) @@ -89,6 +95,7 @@ if(BUILD_TESTING) PRIVATE TSD_DEFAULT_COEFF_PATH="${CMAKE_CURRENT_SOURCE_DIR}/config/controller_coeffs.json" ) + target_link_libraries(thermal_simd_core_tests PUBLIC OpenSSL::SSL OpenSSL::Crypto) add_executable(test_config_parser tests/test_config_parser.c) target_link_libraries(test_config_parser PRIVATE thermal_simd_core_tests) @@ -158,6 +165,11 @@ if(BUILD_TESTING) target_compile_options(test_integration_metrics_tuner PRIVATE -Wall -Wextra) add_test(NAME integration_metrics_tuner COMMAND test_integration_metrics_tuner) + add_executable(test_observability_metrics tests/observability/test_metrics_exporter.cpp) + target_link_libraries(test_observability_metrics PRIVATE thermal_simd_core_tests pthread OpenSSL::SSL OpenSSL::Crypto) + target_compile_options(test_observability_metrics PRIVATE -Wall -Wextra -pthread) + add_test(NAME observability_metrics COMMAND test_observability_metrics) + add_library(tsd_stress_common STATIC tests/stress/stress_common.c src/thermal_simd.c) diff --git a/config/certs/README.md b/config/certs/README.md new file mode 100644 index 0000000..8e1b54e --- /dev/null +++ b/config/certs/README.md @@ -0,0 +1,5 @@ +# Metrics Certificates + +Place production certificates and private keys here when configuring the dispatcher metrics server. The test suite +ships self-signed certificates under `tests/observability/certs/` that can be copied into this directory for local +smoke testing, but **do not** deploy them to production environments. diff --git a/config/observability/README.md b/config/observability/README.md new file mode 100644 index 0000000..68d66eb --- /dev/null +++ b/config/observability/README.md @@ -0,0 +1,12 @@ +# Observability Configuration + +The `metrics.example.json` file documents how to configure the dispatcher metrics server. + +* `bind_address`/`port` configure where the HTTPS listener binds. A port of `0` selects an ephemeral port. +* `tls` enables TLS when `certificate` and `private_key` reference PEM-encoded files. Supplying a `client_ca` + enables optional client authentication; set `require_client_auth` to `true` to enforce mutual TLS. +* `basic_auth` secures the endpoints using HTTP basic authentication. Populate the `username` and `password` fields + with values deployed alongside the dispatcher. +* `statsd` enables the StatsD exporter and points it at an upstream aggregator. + +Copy the example to your deployment configuration management system and replace the placeholder credential paths. diff --git a/config/observability/metrics.example.json b/config/observability/metrics.example.json new file mode 100644 index 0000000..6fe1599 --- /dev/null +++ b/config/observability/metrics.example.json @@ -0,0 +1,20 @@ +{ + "metrics": { + "bind_address": "0.0.0.0", + "port": 9443, + "tls": { + "certificate": "config/certs/dispatcher.crt", + "private_key": "config/certs/dispatcher.key", + "client_ca": "config/certs/ca.crt", + "require_client_auth": false + }, + "basic_auth": { + "username": "metrics", + "password": "change-me" + }, + "statsd": { + "host": "127.0.0.1", + "port": 8125 + } + } +} diff --git a/include/observability/metrics_exporter.h b/include/observability/metrics_exporter.h index b0baa66..bc7fb3f 100644 --- a/include/observability/metrics_exporter.h +++ b/include/observability/metrics_exporter.h @@ -9,6 +9,28 @@ extern "C" { #endif +typedef struct tsd_metrics_tls_config_s { + const char *certificate_path; + const char *private_key_path; + const char *ca_certificate_path; + int require_client_auth; +} tsd_metrics_tls_config_t; + +typedef struct tsd_metrics_basic_auth_s { + const char *username; + const char *password; +} tsd_metrics_basic_auth_t; + +typedef struct tsd_metrics_exporter_config_s { + const char *bind_address; + uint16_t port; + const tsd_metrics_tls_config_t *tls; + const tsd_metrics_basic_auth_t *basic_auth; + const char *statsd_host; + uint16_t statsd_port; +} tsd_metrics_exporter_config_t; + +int tsd_metrics_exporter_start_with_config(const tsd_metrics_exporter_config_t *config); int tsd_metrics_exporter_start(const char *bind_address, uint16_t port); void tsd_metrics_exporter_stop(void); uint16_t tsd_metrics_exporter_listen_port(void); diff --git a/include/observability/statsd_exporter.h b/include/observability/statsd_exporter.h new file mode 100644 index 0000000..9784c7f --- /dev/null +++ b/include/observability/statsd_exporter.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include + +#include + +#include + +namespace observability { + +class StatsdExporter { +public: + static StatsdExporter &instance(); + + void configure(const std::string &host, uint16_t port); + void shutdown(); + + void send_counter(const std::string &name, uint64_t value); + void send_gauge(const std::string &name, double value); + +private: + StatsdExporter(); + ~StatsdExporter(); + StatsdExporter(const StatsdExporter &) = delete; + StatsdExporter &operator=(const StatsdExporter &) = delete; + + void send(const std::string &payload); + + int socket_; + bool configured_; + struct sockaddr_storage destination_; + socklen_t destination_len_; + std::mutex mutex_; +}; + +} // namespace observability + diff --git a/include/observability/telemetry_state.h b/include/observability/telemetry_state.h new file mode 100644 index 0000000..7e836b0 --- /dev/null +++ b/include/observability/telemetry_state.h @@ -0,0 +1,87 @@ +#pragma once + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct tsd_controller_telemetry_s { + int fallback_active; + simd_width_t current_width; + simd_width_t recommended_width; + int issued_change; +} tsd_controller_telemetry_t; + +typedef struct tsd_fusion_telemetry_s { + int running; + int degraded; + int temp_available; + double package_temp_c; + int freq_available; + double freq_ratio; + int cpi_available; + double thermal_cpi; + int power_available; + double power_budget_w; +} tsd_fusion_telemetry_t; + +void tsd_observability_update_controller(const tsd_controller_telemetry_t *telemetry); +void tsd_observability_update_fusion(const tsd_fusion_telemetry_t *telemetry); + +#ifdef __cplusplus +} +#endif + +#ifdef __cplusplus +#include +#include +#include + +namespace observability { + +struct ControllerTelemetrySnapshot { + bool fallback_active{false}; + simd_width_t current_width{SIMD_SSE41}; + simd_width_t recommended_width{SIMD_SSE41}; + bool issued_change{false}; + std::chrono::system_clock::time_point updated_at{std::chrono::system_clock::time_point{}}; +}; + +struct FusionTelemetrySnapshot { + bool running{false}; + bool degraded{false}; + bool temp_available{false}; + double package_temp_c{0.0}; + bool freq_available{false}; + double freq_ratio{0.0}; + bool cpi_available{false}; + double thermal_cpi{0.0}; + bool power_available{false}; + double power_budget_w{0.0}; + std::chrono::system_clock::time_point updated_at{std::chrono::system_clock::time_point{}}; +}; + +class TelemetryState { +public: + static TelemetryState &instance(); + + void update_controller(const tsd_controller_telemetry_t *telemetry); + void update_fusion(const tsd_fusion_telemetry_t *telemetry); + + ControllerTelemetrySnapshot controller_snapshot() const; + FusionTelemetrySnapshot fusion_snapshot() const; + +private: + TelemetryState() = default; + + mutable std::mutex mutex_; + ControllerTelemetrySnapshot controller_{}; + FusionTelemetrySnapshot fusion_{}; +}; + +} // namespace observability +#endif + diff --git a/src/observability/metrics.cpp b/src/observability/metrics.cpp index 2476902..cc63e18 100644 --- a/src/observability/metrics.cpp +++ b/src/observability/metrics.cpp @@ -1,18 +1,28 @@ #include +#include +#include + #include #include #include #include +#include +#include + #include #include +#include #include +#include #include #include +#include #include #include #include +#include namespace { @@ -31,6 +41,19 @@ const char *width_to_string(simd_width_t width) { } } +std::string sanitize_for_statsd(const std::string &value) { + std::string result; + result.reserve(value.size()); + for (char ch : value) { + if ((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9') || ch == '.' || ch == '-' || ch == '_') { + result.push_back(ch); + } else { + result.push_back('_'); + } + } + return result; +} + struct PatchKey { simd_width_t from; simd_width_t to; @@ -72,6 +95,168 @@ struct SensorSnapshot { Clock::time_point updated_at{Clock::now()}; }; +class MetricsRegistry { +public: + static MetricsRegistry &instance() { + static MetricsRegistry registry; + return registry; + } + + void record_patch(simd_width_t from, simd_width_t to, int rc, uint64_t dwell_ms) { + const bool success = (rc == 0); + std::string statsd_name = std::string("tsd.patch_transition.") + width_to_string(from) + "." + width_to_string(to) + (success ? ".success" : ".failure"); + { + std::lock_guard lock(mutex_); + PatchKey key{from, to, success}; + patch_counts_[key] += 1; + if (dwell_ms > 0) { + DwellStats &stats = dwell_stats_[from]; + stats.count += 1; + stats.total_ms += dwell_ms; + stats.max_ms = std::max(stats.max_ms, dwell_ms); + } + } + observability::StatsdExporter::instance().send_counter(statsd_name, 1); + if (dwell_ms > 0) { + std::string dwell_metric = std::string("tsd.dwell.observed.") + width_to_string(from); + observability::StatsdExporter::instance().send_gauge(dwell_metric, static_cast(dwell_ms)); + } + } + + void observe_dwell(simd_width_t width, uint64_t dwell_ms) { + if (dwell_ms == 0) { + return; + } + { + std::lock_guard lock(mutex_); + DwellStats &stats = dwell_stats_[width]; + stats.count += 1; + stats.total_ms += dwell_ms; + stats.max_ms = std::max(stats.max_ms, dwell_ms); + } + std::string dwell_metric = std::string("tsd.dwell_time.") + width_to_string(width); + observability::StatsdExporter::instance().send_gauge(dwell_metric, static_cast(dwell_ms)); + } + + void record_sensor_health(const std::string &sensor, int socket, double health, double quality, bool valid) { + SensorKey key{sensor, socket}; + { + std::lock_guard lock(mutex_); + SensorSnapshot snapshot; + snapshot.health = std::clamp(health, 0.0, 1.0); + snapshot.quality = std::clamp(quality, 0.0, 1.0); + snapshot.valid = valid; + snapshot.updated_at = Clock::now(); + sensor_state_[key] = snapshot; + } + std::string base = std::string("tsd.sensor.") + sanitize_for_statsd(sensor) + ".socket" + std::to_string(socket); + observability::StatsdExporter::instance().send_gauge(base + ".health", health); + observability::StatsdExporter::instance().send_gauge(base + ".quality", quality); + observability::StatsdExporter::instance().send_gauge(base + ".valid", valid ? 1.0 : 0.0); + } + + std::string build_prometheus() { + std::lock_guard lock(mutex_); + std::ostringstream body; + + body << "# HELP tsd_patch_transitions_total Dispatcher patch attempts by outcome.\n"; + body << "# TYPE tsd_patch_transitions_total counter\n"; + for (const auto &entry : patch_counts_) { + const PatchKey &key = entry.first; + body << "tsd_patch_transitions_total{from=\"" << width_to_string(key.from) + << "\",to=\"" << width_to_string(key.to) << "\",outcome=\"" + << (key.success ? "success" : "failure") << "\"} " << entry.second << "\n"; + } + + body << "# HELP tsd_dwell_time_ms_sum Total dwell time observed before transitions.\n"; + body << "# TYPE tsd_dwell_time_ms_sum counter\n"; + for (const auto &entry : dwell_stats_) { + body << "tsd_dwell_time_ms_sum{width=\"" << width_to_string(entry.first) + << "\"} " << entry.second.total_ms << "\n"; + } + + body << "# HELP tsd_dwell_time_ms_count Number of dwell observations.\n"; + body << "# TYPE tsd_dwell_time_ms_count counter\n"; + for (const auto &entry : dwell_stats_) { + body << "tsd_dwell_time_ms_count{width=\"" << width_to_string(entry.first) + << "\"} " << entry.second.count << "\n"; + } + + body << "# HELP tsd_dwell_time_ms_max Maximum dwell time observed.\n"; + body << "# TYPE tsd_dwell_time_ms_max gauge\n"; + for (const auto &entry : dwell_stats_) { + body << "tsd_dwell_time_ms_max{width=\"" << width_to_string(entry.first) + << "\"} " << entry.second.max_ms << "\n"; + } + + body << "# HELP tsd_sensor_health_ratio Last reported sensor health ratio.\n"; + body << "# TYPE tsd_sensor_health_ratio gauge\n"; + for (const auto &entry : sensor_state_) { + body << "tsd_sensor_health_ratio{sensor=\"" << entry.first.sensor + << "\",socket=\"" << entry.first.socket << "\"} " + << entry.second.health << "\n"; + } + + body << "# HELP tsd_sensor_quality_ratio Last reported sensor quality ratio.\n"; + body << "# TYPE tsd_sensor_quality_ratio gauge\n"; + for (const auto &entry : sensor_state_) { + body << "tsd_sensor_quality_ratio{sensor=\"" << entry.first.sensor + << "\",socket=\"" << entry.first.socket << "\"} " + << entry.second.quality << "\n"; + } + + body << "# HELP tsd_sensor_health_valid Last reported sensor validity (1=valid).\n"; + body << "# TYPE tsd_sensor_health_valid gauge\n"; + for (const auto &entry : sensor_state_) { + body << "tsd_sensor_health_valid{sensor=\"" << entry.first.sensor + << "\",socket=\"" << entry.first.socket << "\"} " + << (entry.second.valid ? 1 : 0) << "\n"; + } + + body << "# HELP tsd_sensor_health_timestamp_seconds UNIX time of last sensor report.\n"; + body << "# TYPE tsd_sensor_health_timestamp_seconds gauge\n"; + for (const auto &entry : sensor_state_) { + auto secs = std::chrono::duration_cast(entry.second.updated_at.time_since_epoch()); + body << "tsd_sensor_health_timestamp_seconds{sensor=\"" << entry.first.sensor + << "\",socket=\"" << entry.first.socket << "\"} " + << secs.count() << "\n"; + } + + return body.str(); + } + +private: + MetricsRegistry() = default; + + std::mutex mutex_; + std::map patch_counts_; + std::map dwell_stats_; + std::map sensor_state_; +}; + +struct ExporterTlsConfig { + bool enabled{false}; + std::string certificate; + std::string key; + std::string ca; + bool require_client_auth{false}; +}; + +struct ExporterBasicAuth { + bool enabled{false}; + std::string username; + std::string password; +}; + +struct ExporterConfig { + std::string bind_address{"127.0.0.1"}; + uint16_t port{0}; + ExporterTlsConfig tls; + ExporterBasicAuth auth; + std::string statsd_host; + uint16_t statsd_port{0}; +}; + class PrometheusExporter { public: static PrometheusExporter &instance() { @@ -79,12 +264,17 @@ class PrometheusExporter { return exporter; } - int start(const std::string &bind_address, uint16_t port) { + int start(const ExporterConfig &config) { std::lock_guard lock(server_mutex_); if (running_) { return -1; } + ExporterConfig local_config = config; + if (local_config.bind_address.empty()) { + local_config.bind_address = "127.0.0.1"; + } + int fd = ::socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { return -1; @@ -95,15 +285,11 @@ class PrometheusExporter { sockaddr_in addr{}; addr.sin_family = AF_INET; - if (bind_address.empty()) { - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - if (::inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr) != 1) { - ::close(fd); - return -1; - } + if (::inet_pton(AF_INET, local_config.bind_address.c_str(), &addr.sin_addr) != 1) { + ::close(fd); + return -1; } - addr.sin_port = htons(port); + addr.sin_port = htons(local_config.port); if (::bind(fd, reinterpret_cast(&addr), sizeof(addr)) != 0) { ::close(fd); @@ -121,10 +307,18 @@ class PrometheusExporter { return -1; } + if (!initialize_tls(local_config.tls)) { + ::close(fd); + return -1; + } + + observability::StatsdExporter::instance().configure(local_config.statsd_host, local_config.statsd_port); + + listen_fd_ = fd; listen_port_ = ntohs(actual.sin_port); - bind_address_ = bind_address.empty() ? std::string("127.0.0.1") : bind_address; + bind_address_ = local_config.bind_address; + config_ = local_config; running_ = true; - listen_fd_ = fd; server_thread_ = std::thread(&PrometheusExporter::serve, this); return 0; } @@ -137,6 +331,7 @@ class PrometheusExporter { return; } running_ = false; + observability::StatsdExporter::instance().shutdown(); if (listen_fd_ >= 0) { ::shutdown(listen_fd_, SHUT_RDWR); ::close(listen_fd_); @@ -147,6 +342,7 @@ class PrometheusExporter { if (local_thread.joinable()) { local_thread.join(); } + destroy_tls(); } uint16_t listen_port() const { @@ -154,121 +350,242 @@ class PrometheusExporter { return listen_port_; } - void record_patch(simd_width_t from, simd_width_t to, int rc, uint64_t dwell_ms) { - { - std::lock_guard lock(metrics_mutex_); - PatchKey key{from, to, rc == 0}; - patch_counts_[key] += 1; - if (dwell_ms > 0) { - dwell_stats_[from].count += 1; - dwell_stats_[from].total_ms += dwell_ms; - dwell_stats_[from].max_ms = std::max(dwell_stats_[from].max_ms, dwell_ms); + MetricsRegistry ®istry() { return MetricsRegistry::instance(); } + +private: + PrometheusExporter() = default; + ~PrometheusExporter() { stop(); } + PrometheusExporter(const PrometheusExporter &) = delete; + PrometheusExporter &operator=(const PrometheusExporter &) = delete; + + bool initialize_tls(const ExporterTlsConfig &tls) { + if (!tls.enabled) { + destroy_tls(); + use_tls_ = false; + return true; + } + + static std::once_flag init_once; + std::call_once(init_once, [] { SSL_library_init(); SSL_load_error_strings(); OpenSSL_add_all_algorithms(); }); + + SSL_CTX *ctx = SSL_CTX_new(TLS_server_method()); + if (!ctx) { + return false; + } + SSL_CTX_set_min_proto_version(ctx, TLS1_2_VERSION); + if (SSL_CTX_use_certificate_file(ctx, tls.certificate.c_str(), SSL_FILETYPE_PEM) <= 0) { + SSL_CTX_free(ctx); + return false; + } + if (SSL_CTX_use_PrivateKey_file(ctx, tls.key.c_str(), SSL_FILETYPE_PEM) <= 0) { + SSL_CTX_free(ctx); + return false; + } + if (!tls.ca.empty()) { + if (SSL_CTX_load_verify_locations(ctx, tls.ca.c_str(), nullptr) <= 0) { + SSL_CTX_free(ctx); + return false; } } + if (tls.require_client_auth) { + if (tls.ca.empty()) { + SSL_CTX_free(ctx); + return false; + } + SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, nullptr); + } + + destroy_tls(); + ssl_ctx_ = ctx; + use_tls_ = true; + return true; } - void observe_dwell(simd_width_t width, uint64_t dwell_ms) { - if (dwell_ms == 0) { - return; + void destroy_tls() { + if (ssl_ctx_) { + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; } - std::lock_guard lock(metrics_mutex_); - DwellStats &stats = dwell_stats_[width]; - stats.count += 1; - stats.total_ms += dwell_ms; - stats.max_ms = std::max(stats.max_ms, dwell_ms); } - void record_sensor_health(const std::string &sensor, - int socket, - double health, - double quality, - bool valid) { - std::lock_guard lock(metrics_mutex_); - SensorKey key{sensor, socket}; - SensorSnapshot snapshot; - snapshot.health = std::clamp(health, 0.0, 1.0); - snapshot.quality = std::clamp(quality, 0.0, 1.0); - snapshot.valid = valid; - snapshot.updated_at = Clock::now(); - sensor_state_[key] = snapshot; + static std::string base64_decode(const std::string &input) { + static const std::string alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string output; + output.reserve((input.size() * 3) / 4); + int val = 0; + int valb = -8; + for (unsigned char c : input) { + if (std::isspace(c)) { + continue; + } + if (c == '=') { + break; + } + auto pos = alphabet.find(c); + if (pos == std::string::npos) { + return std::string(); + } + val = (val << 6) + static_cast(pos); + valb += 6; + if (valb >= 0) { + output.push_back(static_cast((val >> valb) & 0xFF)); + valb -= 8; + } + } + return output; } - std::string build_metrics() { - std::lock_guard lock(metrics_mutex_); - std::ostringstream body; - - body << "# HELP tsd_patch_transitions_total Dispatcher patch attempts by outcome.\n"; - body << "# TYPE tsd_patch_transitions_total counter\n"; - for (const auto &entry : patch_counts_) { - const PatchKey &key = entry.first; - body << "tsd_patch_transitions_total{from=\"" << width_to_string(key.from) - << "\",to=\"" << width_to_string(key.to) << "\",outcome=\"" - << (key.success ? "success" : "failure") << "\"} " << entry.second << "\n"; + struct HttpRequest { + std::string method; + std::string path; + std::map headers; + }; + + static std::optional parse_request(const std::string &buffer) { + HttpRequest request; + std::istringstream stream(buffer); + std::string line; + if (!std::getline(stream, line)) { + return std::nullopt; } - - body << "# HELP tsd_dwell_time_ms_sum Total dwell time observed before transitions.\n"; - body << "# TYPE tsd_dwell_time_ms_sum counter\n"; - for (const auto &entry : dwell_stats_) { - body << "tsd_dwell_time_ms_sum{width=\"" << width_to_string(entry.first) - << "\"} " << entry.second.total_ms << "\n"; + if (!line.empty() && line.back() == '\r') { + line.pop_back(); } - - body << "# HELP tsd_dwell_time_ms_count Number of dwell observations.\n"; - body << "# TYPE tsd_dwell_time_ms_count counter\n"; - for (const auto &entry : dwell_stats_) { - body << "tsd_dwell_time_ms_count{width=\"" << width_to_string(entry.first) - << "\"} " << entry.second.count << "\n"; + std::istringstream first(line); + if (!(first >> request.method >> request.path)) { + return std::nullopt; } - - body << "# HELP tsd_dwell_time_ms_max Maximum dwell time observed.\n"; - body << "# TYPE tsd_dwell_time_ms_max gauge\n"; - for (const auto &entry : dwell_stats_) { - body << "tsd_dwell_time_ms_max{width=\"" << width_to_string(entry.first) - << "\"} " << entry.second.max_ms << "\n"; + while (std::getline(stream, line)) { + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + if (line.empty()) { + break; + } + auto colon = line.find(':'); + if (colon == std::string::npos) { + continue; + } + std::string key = line.substr(0, colon); + std::string value = line.substr(colon + 1); + while (!value.empty() && std::isspace(static_cast(value.front()))) { + value.erase(value.begin()); + } + std::transform(key.begin(), key.end(), key.begin(), [](unsigned char ch) { return static_cast(std::tolower(ch)); }); + request.headers[key] = value; } + return request; + } - body << "# HELP tsd_sensor_health_ratio Last reported sensor health ratio.\n"; - body << "# TYPE tsd_sensor_health_ratio gauge\n"; - for (const auto &entry : sensor_state_) { - body << "tsd_sensor_health_ratio{sensor=\"" << entry.first.sensor - << "\",socket=\"" << entry.first.socket << "\"} " - << entry.second.health << "\n"; + static bool send_payload(int client, SSL *ssl, const std::string &payload) { + const char *data = payload.data(); + size_t remaining = payload.size(); + while (remaining > 0) { + ssize_t sent = ssl ? SSL_write(ssl, data, static_cast(remaining)) : ::send(client, data, remaining, 0); + if (sent <= 0) { + return false; + } + data += static_cast(sent); + remaining -= static_cast(sent); } + return true; + } - body << "# HELP tsd_sensor_quality_ratio Last reported sensor quality ratio.\n"; - body << "# TYPE tsd_sensor_quality_ratio gauge\n"; - for (const auto &entry : sensor_state_) { - body << "tsd_sensor_quality_ratio{sensor=\"" << entry.first.sensor - << "\",socket=\"" << entry.first.socket << "\"} " - << entry.second.quality << "\n"; + static std::string read_request_payload(int client, SSL *ssl) { + std::string buffer; + buffer.reserve(1024); + char chunk[512]; + while (buffer.find("\r\n\r\n") == std::string::npos) { + ssize_t received = ssl ? SSL_read(ssl, chunk, sizeof(chunk)) : ::recv(client, chunk, sizeof(chunk), 0); + if (received <= 0) { + break; + } + buffer.append(chunk, static_cast(received)); + if (buffer.size() > 8192) { + break; + } } + return buffer; + } - body << "# HELP tsd_sensor_health_valid Last reported sensor validity (1=valid).\n"; - body << "# TYPE tsd_sensor_health_valid gauge\n"; - for (const auto &entry : sensor_state_) { - body << "tsd_sensor_health_valid{sensor=\"" << entry.first.sensor - << "\",socket=\"" << entry.first.socket << "\"} " - << (entry.second.valid ? 1 : 0) << "\n"; + bool authorized(const HttpRequest &request) const { + if (!config_.auth.enabled) { + return true; } - - body << "# HELP tsd_sensor_health_timestamp_seconds UNIX time of last sensor report.\n"; - body << "# TYPE tsd_sensor_health_timestamp_seconds gauge\n"; - for (const auto &entry : sensor_state_) { - auto secs = std::chrono::duration_cast( - entry.second.updated_at.time_since_epoch()); - body << "tsd_sensor_health_timestamp_seconds{sensor=\"" << entry.first.sensor - << "\",socket=\"" << entry.first.socket << "\"} " - << secs.count() << "\n"; + auto it = request.headers.find("authorization"); + if (it == request.headers.end()) { + return false; + } + const std::string &value = it->second; + const std::string prefix = "Basic "; + if (value.size() <= prefix.size() || value.compare(0, prefix.size(), prefix) != 0) { + return false; } + std::string decoded = base64_decode(value.substr(prefix.size())); + std::string expected = config_.auth.username + ":" + config_.auth.password; + return decoded == expected; + } - return body.str(); + std::string build_health_json() const { + auto controller = observability::TelemetryState::instance().controller_snapshot(); + auto fusion = observability::TelemetryState::instance().fusion_snapshot(); + auto controller_secs = std::chrono::duration_cast(controller.updated_at.time_since_epoch()).count(); + auto fusion_secs = std::chrono::duration_cast(fusion.updated_at.time_since_epoch()).count(); + + std::ostringstream json; + json << "{\"controller\":{\"fallbackActive\":" << (controller.fallback_active ? "true" : "false") + << ",\"currentWidth\":\"" << width_to_string(controller.current_width) << "\"" + << ",\"recommendedWidth\":\"" << width_to_string(controller.recommended_width) << "\"" + << ",\"issuedChange\":" << (controller.issued_change ? "true" : "false") + << ",\"updatedAtSeconds\":" << controller_secs << "}," + << "\"fusion\":{\"running\":" << (fusion.running ? "true" : "false") + << ",\"degraded\":" << (fusion.degraded ? "true" : "false") + << ",\"tempAvailable\":" << (fusion.temp_available ? "true" : "false") + << ",\"packageTempC\":" << std::fixed << std::setprecision(2) << fusion.package_temp_c + << ",\"freqAvailable\":" << (fusion.freq_available ? "true" : "false") + << ",\"freqRatio\":" << fusion.freq_ratio + << ",\"cpiAvailable\":" << (fusion.cpi_available ? "true" : "false") + << ",\"thermalCpi\":" << fusion.thermal_cpi + << ",\"powerAvailable\":" << (fusion.power_available ? "true" : "false") + << ",\"powerBudgetW\":" << fusion.power_budget_w + << ",\"updatedAtSeconds\":" << fusion_secs << "}}"; + return json.str(); } -private: - PrometheusExporter() = default; - ~PrometheusExporter() { stop(); } - PrometheusExporter(const PrometheusExporter &) = delete; - PrometheusExporter &operator=(const PrometheusExporter &) = delete; + bool readiness_ok() const { + auto controller = observability::TelemetryState::instance().controller_snapshot(); + auto fusion = observability::TelemetryState::instance().fusion_snapshot(); + auto now = std::chrono::system_clock::now(); + bool controller_recent = controller.updated_at.time_since_epoch().count() != 0 && + (now - controller.updated_at) <= std::chrono::seconds(5); + bool fusion_recent = fusion.updated_at.time_since_epoch().count() != 0 && + (now - fusion.updated_at) <= std::chrono::seconds(5); + bool healthy = controller_recent && fusion_recent && !controller.fallback_active && fusion.running && !fusion.degraded; + return healthy; + } + + bool health_ok() const { + auto controller = observability::TelemetryState::instance().controller_snapshot(); + auto fusion = observability::TelemetryState::instance().fusion_snapshot(); + return !controller.fallback_active && fusion.running && !fusion.degraded; + } + + static std::string build_response(const std::string &status, + const std::string &content_type, + const std::string &body, + const std::vector> &headers = {}) { + std::ostringstream response; + response << "HTTP/1.1 " << status << "\r\n"; + response << "Content-Type: " << content_type << "\r\n"; + response << "Content-Length: " << body.size() << "\r\n"; + response << "Cache-Control: no-cache\r\n"; + for (const auto &header : headers) { + response << header.first << ": " << header.second << "\r\n"; + } + response << "Connection: close\r\n\r\n"; + response << body; + return response.str(); + } void serve() { while (true) { @@ -280,64 +597,138 @@ class PrometheusExporter { } continue; } - handle_client(client); - ::close(client); + if (use_tls_ && ssl_ctx_) { + SSL *ssl = SSL_new(ssl_ctx_); + if (!ssl) { + ::close(client); + continue; + } + SSL_set_fd(ssl, client); + if (SSL_accept(ssl) <= 0) { + SSL_free(ssl); + ::close(client); + continue; + } + handle_client(client, ssl); + SSL_shutdown(ssl); + SSL_free(ssl); + ::close(client); + } else { + handle_client(client, nullptr); + ::close(client); + } } } - void handle_client(int client) { - char buffer[512]; - ssize_t n = ::recv(client, buffer, sizeof(buffer) - 1, 0); - if (n <= 0) { + void handle_client(int client, SSL *ssl) { + std::string request_payload = read_request_payload(client, ssl); + if (request_payload.empty()) { + return; + } + auto parsed = parse_request(request_payload); + if (!parsed) { + std::string response = build_response("400 Bad Request", "text/plain", "bad request\n"); + send_payload(client, ssl, response); return; } - buffer[n] = '\0'; - std::string request(buffer); - if (request.rfind("GET /metrics", 0) != 0) { - static const char kNotFound[] = - "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; - (void)::send(client, kNotFound, sizeof(kNotFound) - 1, 0); + const HttpRequest &request = *parsed; + if (request.method != "GET") { + std::string response = build_response("405 Method Not Allowed", "text/plain", "method not allowed\n"); + send_payload(client, ssl, response); return; } - std::string body = build_metrics(); - std::ostringstream response; - response << "HTTP/1.1 200 OK\r\n"; - response << "Content-Type: text/plain; version=0.0.4\r\n"; - response << "Content-Length: " << body.size() << "\r\n"; - response << "Cache-Control: no-cache\r\n"; - response << "Connection: close\r\n\r\n"; - response << body; - const std::string &payload = response.str(); - size_t total_sent = 0; - while (total_sent < payload.size()) { - ssize_t sent = ::send(client, payload.data() + total_sent, payload.size() - total_sent, 0); - if (sent <= 0) { - break; - } - total_sent += static_cast(sent); + if (!authorized(request)) { + std::vector> headers = {{"WWW-Authenticate", "Basic realm=\"metrics\""}}; + std::string response = build_response("401 Unauthorized", "text/plain", "unauthorized\n", headers); + send_payload(client, ssl, response); + return; + } + + if (request.path == "/metrics") { + std::string body = MetricsRegistry::instance().build_prometheus(); + std::string response = build_response("200 OK", "text/plain; version=0.0.4", body); + send_payload(client, ssl, response); + return; + } + + if (request.path == "/healthz") { + bool ok = health_ok(); + std::string body = build_health_json(); + std::string response = build_response(ok ? "200 OK" : "503 Service Unavailable", "application/json", body); + send_payload(client, ssl, response); + return; } + + if (request.path == "/readyz") { + bool ok = readiness_ok(); + std::string body = build_health_json(); + std::string response = build_response(ok ? "200 OK" : "503 Service Unavailable", "application/json", body); + send_payload(client, ssl, response); + return; + } + + std::string response = build_response("404 Not Found", "text/plain", "not found\n"); + send_payload(client, ssl, response); } mutable std::mutex server_mutex_; - mutable std::mutex metrics_mutex_; std::thread server_thread_; + bool running_{false}; int listen_fd_{-1}; uint16_t listen_port_{0}; - std::string bind_address_; - bool running_{false}; - std::map patch_counts_; - std::map dwell_stats_; - std::map sensor_state_; + std::string bind_address_{"127.0.0.1"}; + ExporterConfig config_{}; + bool use_tls_{false}; + SSL_CTX *ssl_ctx_{nullptr}; }; } // namespace extern "C" { +int tsd_metrics_exporter_start_with_config(const tsd_metrics_exporter_config_t *config) { + ExporterConfig exporter_config; + if (config) { + if (config->bind_address) { + exporter_config.bind_address = config->bind_address; + } + exporter_config.port = config->port; + if (config->tls) { + exporter_config.tls.enabled = true; + if (config->tls->certificate_path) { + exporter_config.tls.certificate = config->tls->certificate_path; + } + if (config->tls->private_key_path) { + exporter_config.tls.key = config->tls->private_key_path; + } + if (config->tls->ca_certificate_path) { + exporter_config.tls.ca = config->tls->ca_certificate_path; + } + exporter_config.tls.require_client_auth = config->tls->require_client_auth != 0; + } + if (config->basic_auth) { + exporter_config.auth.enabled = true; + if (config->basic_auth->username) { + exporter_config.auth.username = config->basic_auth->username; + } + if (config->basic_auth->password) { + exporter_config.auth.password = config->basic_auth->password; + } + } + if (config->statsd_host) { + exporter_config.statsd_host = config->statsd_host; + } + exporter_config.statsd_port = config->statsd_port; + } + return PrometheusExporter::instance().start(exporter_config); +} + int tsd_metrics_exporter_start(const char *bind_address, uint16_t port) { - std::string address = bind_address ? std::string(bind_address) : std::string(); - return PrometheusExporter::instance().start(address, port); + tsd_metrics_exporter_config_t config{}; + config.bind_address = bind_address; + config.port = port; + return tsd_metrics_exporter_start_with_config(&config); } void tsd_metrics_exporter_stop(void) { @@ -349,11 +740,11 @@ uint16_t tsd_metrics_exporter_listen_port(void) { } void tsd_metrics_exporter_record_patch(simd_width_t from, simd_width_t to, int rc, uint64_t dwell_ms) { - PrometheusExporter::instance().record_patch(from, to, rc, dwell_ms); + PrometheusExporter::instance().registry().record_patch(from, to, rc, dwell_ms); } void tsd_metrics_exporter_observe_dwell(simd_width_t width, uint64_t dwell_ms) { - PrometheusExporter::instance().observe_dwell(width, dwell_ms); + PrometheusExporter::instance().registry().observe_dwell(width, dwell_ms); } void tsd_metrics_exporter_record_sensor_health(const char *sensor_name, @@ -364,7 +755,7 @@ void tsd_metrics_exporter_record_sensor_health(const char *sensor_name, if (!sensor_name) { return; } - PrometheusExporter::instance().record_sensor_health(sensor_name, socket, health, quality, valid != 0); + PrometheusExporter::instance().registry().record_sensor_health(sensor_name, socket, health, quality, valid != 0); } } // extern "C" diff --git a/src/observability/statsd_exporter.cpp b/src/observability/statsd_exporter.cpp new file mode 100644 index 0000000..3d264ab --- /dev/null +++ b/src/observability/statsd_exporter.cpp @@ -0,0 +1,94 @@ +#include + +#include +#include +#include + +#include +#include +#include + +namespace observability { + +StatsdExporter &StatsdExporter::instance() { + static StatsdExporter exporter; + return exporter; +} + +StatsdExporter::StatsdExporter() : socket_(-1), configured_(false), destination_{}, destination_len_(0) {} + +StatsdExporter::~StatsdExporter() { shutdown(); } + +void StatsdExporter::configure(const std::string &host, uint16_t port) { + std::lock_guard lock(mutex_); + if (socket_ >= 0) { + ::close(socket_); + socket_ = -1; + } + configured_ = false; + destination_len_ = 0; + + if (host.empty() || port == 0) { + return; + } + + struct addrinfo hints {}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + struct addrinfo *result = nullptr; + std::string port_string = std::to_string(port); + int rc = ::getaddrinfo(host.c_str(), port_string.c_str(), &hints, &result); + if (rc != 0 || !result) { + return; + } + + socket_ = ::socket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (socket_ < 0) { + ::freeaddrinfo(result); + return; + } + + std::memset(&destination_, 0, sizeof(destination_)); + std::memcpy(&destination_, result->ai_addr, result->ai_addrlen); + destination_len_ = static_cast(result->ai_addrlen); + configured_ = true; + + ::freeaddrinfo(result); +} + +void StatsdExporter::shutdown() { + std::lock_guard lock(mutex_); + if (socket_ >= 0) { + ::close(socket_); + socket_ = -1; + } + configured_ = false; + destination_len_ = 0; +} + +void StatsdExporter::send_counter(const std::string &name, uint64_t value) { + std::ostringstream stream; + stream << name << ":" << value << "|c"; + send(stream.str()); +} + +void StatsdExporter::send_gauge(const std::string &name, double value) { + std::ostringstream stream; + stream.setf(std::ios::fixed); + stream.precision(3); + stream << name << ":" << value << "|g"; + send(stream.str()); +} + +void StatsdExporter::send(const std::string &payload) { + std::lock_guard lock(mutex_); + if (!configured_ || socket_ < 0) { + return; + } + (void)::sendto(socket_, payload.data(), payload.size(), 0, + reinterpret_cast(&destination_), destination_len_); +} + +} // namespace observability + diff --git a/src/observability/telemetry_state.cpp b/src/observability/telemetry_state.cpp new file mode 100644 index 0000000..9dd86af --- /dev/null +++ b/src/observability/telemetry_state.cpp @@ -0,0 +1,66 @@ +#include + +#include +#include + +namespace observability { + +TelemetryState &TelemetryState::instance() { + static TelemetryState state; + return state; +} + +void TelemetryState::update_controller(const tsd_controller_telemetry_t *telemetry) { + if (!telemetry) { + return; + } + std::lock_guard lock(mutex_); + controller_.fallback_active = telemetry->fallback_active != 0; + controller_.current_width = telemetry->current_width; + controller_.recommended_width = telemetry->recommended_width; + controller_.issued_change = telemetry->issued_change != 0; + controller_.updated_at = std::chrono::system_clock::now(); +} + +void TelemetryState::update_fusion(const tsd_fusion_telemetry_t *telemetry) { + if (!telemetry) { + return; + } + std::lock_guard lock(mutex_); + fusion_.running = telemetry->running != 0; + fusion_.degraded = telemetry->degraded != 0; + fusion_.temp_available = telemetry->temp_available != 0; + fusion_.package_temp_c = telemetry->package_temp_c; + fusion_.freq_available = telemetry->freq_available != 0; + fusion_.freq_ratio = telemetry->freq_ratio; + fusion_.cpi_available = telemetry->cpi_available != 0; + fusion_.thermal_cpi = telemetry->thermal_cpi; + fusion_.power_available = telemetry->power_available != 0; + fusion_.power_budget_w = telemetry->power_budget_w; + fusion_.updated_at = std::chrono::system_clock::now(); +} + +ControllerTelemetrySnapshot TelemetryState::controller_snapshot() const { + std::lock_guard lock(mutex_); + return controller_; +} + +FusionTelemetrySnapshot TelemetryState::fusion_snapshot() const { + std::lock_guard lock(mutex_); + return fusion_; +} + +} // namespace observability + +extern "C" { + +void tsd_observability_update_controller(const tsd_controller_telemetry_t *telemetry) { + observability::TelemetryState::instance().update_controller(telemetry); +} + +void tsd_observability_update_fusion(const tsd_fusion_telemetry_t *telemetry) { + observability::TelemetryState::instance().update_fusion(telemetry); +} + +} // extern "C" + diff --git a/src/policy/dispatcher_policy.cpp b/src/policy/dispatcher_policy.cpp index 850a1e7..f3b10a0 100644 --- a/src/policy/dispatcher_policy.cpp +++ b/src/policy/dispatcher_policy.cpp @@ -2,6 +2,8 @@ #include +#include + #include "mpc_controller.h" namespace { @@ -14,6 +16,18 @@ struct DispatcherPolicyState { DispatcherPolicyState() : config{}, fallback_active(false), controller(nullptr) {} }; +void publish_state(const DispatcherPolicyState &state, + simd_width_t current, + simd_width_t recommended, + bool changed) { + tsd_controller_telemetry_t telemetry{}; + telemetry.fallback_active = state.fallback_active ? 1 : 0; + telemetry.current_width = current; + telemetry.recommended_width = recommended; + telemetry.issued_change = changed ? 1 : 0; + tsd_observability_update_controller(&telemetry); +} + } // namespace extern "C" { @@ -31,6 +45,7 @@ tsd_dispatcher_policy_state* tsd_dispatcher_policy_create(const tsd_policy_confi } state->controller = std::make_unique(state->config); state->fallback_active = false; + publish_state(*state, SIMD_SSE41, SIMD_SSE41, false); } catch (...) { delete state; return nullptr; @@ -59,11 +74,13 @@ void tsd_dispatcher_policy_reset(tsd_dispatcher_policy_state *opaque, const tsd_ state->controller = std::make_unique(state->config); } catch (...) { state->fallback_active = true; + publish_state(*state, SIMD_SSE41, SIMD_SSE41, false); return; } } state->controller->reset(state->config); state->fallback_active = false; + publish_state(*state, SIMD_SSE41, SIMD_SSE41, false); } void tsd_dispatcher_policy_record(tsd_dispatcher_policy_state *opaque, @@ -78,6 +95,7 @@ void tsd_dispatcher_policy_record(tsd_dispatcher_policy_state *opaque, state->controller = std::make_unique(state->config); } catch (...) { state->fallback_active = true; + publish_state(*state, width, width, false); return; } } @@ -100,6 +118,7 @@ int tsd_dispatcher_policy_recommend(tsd_dispatcher_policy_state *opaque, if (fallback_active) { *fallback_active = 1; } + publish_state(*state, current_width, current_width, false); return 0; } if (!state->controller) { @@ -110,6 +129,7 @@ int tsd_dispatcher_policy_recommend(tsd_dispatcher_policy_state *opaque, if (fallback_active) { *fallback_active = 1; } + publish_state(*state, current_width, current_width, false); return 0; } } @@ -124,6 +144,7 @@ int tsd_dispatcher_policy_recommend(tsd_dispatcher_policy_state *opaque, if (out_width) { *out_width = target; } + publish_state(*state, current_width, target, changed); return 1; } @@ -133,6 +154,7 @@ void tsd_dispatcher_policy_force_fallback(tsd_dispatcher_policy_state *opaque) { } auto *state = reinterpret_cast(opaque); state->fallback_active = true; + publish_state(*state, SIMD_SSE41, SIMD_SSE41, false); } } // extern "C" diff --git a/src/telemetry/fusion.cpp b/src/telemetry/fusion.cpp index a94e538..bb2a0ec 100644 --- a/src/telemetry/fusion.cpp +++ b/src/telemetry/fusion.cpp @@ -3,6 +3,8 @@ #include #include +#include + #include namespace telemetry { @@ -64,6 +66,9 @@ void TelemetryFusion::start() { } std::lock_guard lock(thread_mutex_); thread_ = std::thread([this] { run(); }); + tsd_fusion_telemetry_t telemetry{}; + telemetry.running = 1; + tsd_observability_update_fusion(&telemetry); } void TelemetryFusion::stop() { @@ -77,6 +82,9 @@ void TelemetryFusion::stop() { thread_.join(); } } + tsd_fusion_telemetry_t telemetry{}; + telemetry.running = 0; + tsd_observability_update_fusion(&telemetry); } bool TelemetryFusion::running() const { return running_.load(); } @@ -128,6 +136,18 @@ TelemetrySnapshot TelemetryFusion::fuse(std::chrono::steady_clock::time_point no if (!snapshot.temp_available || !snapshot.freq_available || !snapshot.cpi_available) { snapshot.degraded = true; } + tsd_fusion_telemetry_t telemetry{}; + telemetry.running = running_.load() ? 1 : 0; + telemetry.degraded = snapshot.degraded ? 1 : 0; + telemetry.temp_available = snapshot.temp_available ? 1 : 0; + telemetry.package_temp_c = snapshot.package_temp_c; + telemetry.freq_available = snapshot.freq_available ? 1 : 0; + telemetry.freq_ratio = snapshot.freq_ratio; + telemetry.cpi_available = snapshot.cpi_available ? 1 : 0; + telemetry.thermal_cpi = snapshot.thermal_cpi; + telemetry.power_available = snapshot.power_available ? 1 : 0; + telemetry.power_budget_w = snapshot.power_budget_w; + tsd_observability_update_fusion(&telemetry); return snapshot; } diff --git a/tests/observability/certs/ca.crt b/tests/observability/certs/ca.crt new file mode 100644 index 0000000..cc5f5f7 --- /dev/null +++ b/tests/observability/certs/ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDDTCCAfWgAwIBAgIUJpwKEApAZR5Y+LqHXgvwfcI429cwDQYJKoZIhvcNAQEL +BQAwFjEUMBIGA1UEAwwLVFNEIFRlc3QgQ0EwHhcNMjUxMDI5MTAxODM1WhcNMjYx +MDI5MTAxODM1WjAWMRQwEgYDVQQDDAtUU0QgVGVzdCBDQTCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBAPrnRKjiuHk6lB12b7YJnN+HDQBHQgfnpMw5zICG +pqrMGV9SKoihlDFi2oZYwYvZ3uW+XPBdZFPALnYLwDMGD98esr8JKgc7iqSagX1y +oBQJ4gR2tVYqUjYgVO7PtUHGPaso4ec2ps4PIVQuYcdRSGekLbbuzD/n4vjUyY7H +ksqcdV+MaU+zsiH3ymwEd+Zjih1tSe/ULYkZYX5gdgTvFMQ0PVUOeEbIWS5J1PTf +S6etTwOf+SzbKT2oINuCoKGbB+MpL5zXqFW1i80nO2Afu2oQ2IP07m32L/vs7TA5 +9EUrCRyeU+4t5RbZSvVYLzKZIBz6BZ3qGkkTvVW1E/lnST0CAwEAAaNTMFEwHQYD +VR0OBBYEFLQ6cF64bK0sE6SUTR+htpalw+KAMB8GA1UdIwQYMBaAFLQ6cF64bK0s +E6SUTR+htpalw+KAMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB +ANi4E5ulsFGci9SipuEIel3W5ArTSA9YVKy1ttuPixtcOqsk93kMAqxkDm4iAZx6 +9mDDufggTGmPLSFme8wde3QjxUcn8F7eBtDVYLHK7NCGGuCwwU6NWrHLZhak37yY +DYkQxw/IKo7VEkjpPvtt0T4mhCuEkoAzPxgqZCnGqS5O4DMKg44GawS7uYJy+8pS +wbovtGIGMKSqeDIj9JC/w2ezPQno6RLAOFllXP+DcgosOni4QPK3+nxABEZqDa35 +kZDXIiu8G+tuIx6vscfZTQdBhjHudXLCn1pX8CK4z2cEplIbPtuGmASWRxwW/SoG +4J/BUKfLuEpaQOgnKDMd62s= +-----END CERTIFICATE----- diff --git a/tests/observability/certs/server.crt b/tests/observability/certs/server.crt new file mode 100644 index 0000000..58c2326 --- /dev/null +++ b/tests/observability/certs/server.crt @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICsTCCAZkCFAclq+rffN2+MekvWvsutIY9HMNoMA0GCSqGSIb3DQEBCwUAMBYx +FDASBgNVBAMMC1RTRCBUZXN0IENBMB4XDTI1MTAyOTEwMTg0MFoXDTI2MTAyOTEw +MTg0MFowFDESMBAGA1UEAwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEA+EYm7OVb6XYcoIKA525DNZD7HfLB7IHrc6cVUFmYzUyyq4GV +jdbbgmKC5NGY3aMAeZbKTi35Zgfv3R8ppiWrma7HaDEPKtHAagcMI99HiwGu0bTi +ss+AA+2Che6T5W6/ZAUNkKNZ3NygwsOKz7UceaBoDhEgL/dHWLRUHvRlqAGNH4wS +jmEr9O4bGvDfBaMCou41UQhGQxqSiVbvajVI6qtMzd/NOhnN+0DBdMIM/jSF2TuF +CSM1ETzPKrw1Xa2lghCXj2iStK0XldKD4m2cO342O8BAnwmU+TWwek5I/hrCQcFl +cKTK4hqFw0PAFdkfBLggmkhEu0k8bD80yggSGQIDAQABMA0GCSqGSIb3DQEBCwUA +A4IBAQCUzr9tySEjEUJ+y17UUe2Ii+F22cB3mWLG8wHJMJTfEJZl0n8P2QUHx7z1 +pa+xJwoxFKyetjsCJjpBdmkVpZJbFQzDPGoAr0HzvR1OjxRYi/rqe3Uvl7hpF1GC +yfItU4PYMDdb9laVbzbPzKcp/gxikbszsuwR0DHkZ/7FeCpaoAJoIqAF/PWOq9sq +yFFjS/4K303eV5yJs4vv7wqtrTGN7e3Wmd1QHtM9YmWv9m7rIo5W6Mcrgzz0r9kT +rZwwAeOGPaF6et9QU3W0Bg8NK8afTFU7X8V2TOpXtuKY5EafQEmBoJIt0OtOl6om +pcp9j3wFy3ZZXn295vZ2qfxoU/Pm +-----END CERTIFICATE----- diff --git a/tests/observability/certs/server.key b/tests/observability/certs/server.key new file mode 100644 index 0000000..0d2771e --- /dev/null +++ b/tests/observability/certs/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQD4Ribs5Vvpdhyg +goDnbkM1kPsd8sHsgetzpxVQWZjNTLKrgZWN1tuCYoLk0ZjdowB5lspOLflmB+/d +HymmJauZrsdoMQ8q0cBqBwwj30eLAa7RtOKyz4AD7YKF7pPlbr9kBQ2Qo1nc3KDC +w4rPtRx5oGgOESAv90dYtFQe9GWoAY0fjBKOYSv07hsa8N8FowKi7jVRCEZDGpKJ +Vu9qNUjqq0zN3806Gc37QMF0wgz+NIXZO4UJIzURPM8qvDVdraWCEJePaJK0rReV +0oPibZw7fjY7wECfCZT5NbB6Tkj+GsJBwWVwpMriGoXDQ8AV2R8EuCCaSES7STxs +PzTKCBIZAgMBAAECggEAHMjTYAFaheeD+6cNx2c9AxOfTbJUwfUxJWxAeBufL+Q9 +zc0dGAAKXKRLvThAwyeQGUi+hVBmuynd6TLg+wqgidWd+GpBsir15kURZwJO0uK7 +0EPqyaTtmGb7zEfkHURZC8FbOIL72vO4bsRJjnnWxpIupjzdkLoYa0QjAaC/vXnQ +G6CMu01HYZm7Pz1Ss5w10c9v14k4tRVFmuMptug63rZbdXFdPxX2qxuy6sK24p61 +vF5qRtTa/mKnt5701ro3HpIGVASB+xzii8XtNPHxPNgFQdCEbyRxlMMAq2nHZWxq +MxtZhUNKX/SGFVE42SNW7PWqyF+lHPux/7henId9wQKBgQD8ZUUCND3kkxGBiqCk +5RwIlwM0wP2uQdWmk33wMUa/7QRgt0ypHHmTfgitO7y4pGMPGld6vC5y5I0BVp2t +gCtoEOsRgmtL45xOtvQ87SPq5NyXTlqgGALUlblR8bPlpF8Q03Hk2vQY+KHJKxJV +3fWzD1OfEUxGVRlpu+mFVypOIQKBgQD70dCGAEOd8OWtMt3JLU7SsZKffzxBhjXD +LeTE1PPmT6RoCgwdOA7/xSFv6IsKIg4Hr5TEPP+c+ptmayvdnRIviM9xrrmJ+kAG +G9wuNzn4IeczIqvs5tm29NSEqK34NAehcX4nJ25icPNQsWJjmBkOo3nTKTNVT8vo ++pijD9eU+QKBgQDQTqBy3j0KzlqPaKOHFFRbvo711iZI0KsNjQNO3sx+kJahYAcU +N50bntR4bSon1fyvtVTnxjIrvvelWCBQ1sx8WiI4VERL4PHbgWH4etu+/N4WvqNL +KCYF4ACQYgTQjiWr1GdWs2LszR9x0WuTlNo4y3h/ex5RmeCchGuKElOnIQKBgGFa +34w+4+VW8eTBCp+xZYztW+tP7ILEmVppDs3OKrE6zyvt3nsZZisaEnBKbh6vxc7Q +4enwTz5oYNX6zw2xoQtBkDm6PD8Mwd3avYgcj8vPh6vsVp2TLk6eXt8eu4t+yghv +cT8/7lweoOLsgUZR0jYnm3y4rADqhcH/yC0afdU5AoGBAOyUVViP+TlzX7Ttjxwh +92fi2OaueSbMgR1oCJXT5pEkSD/LuV3g4DWyO+hE8hVYYzkla6eaUI+Yt1noQomn +aVDjwbUphNygT+gN9TJXp/RwaegebC+rZyJg4oelz6U+WLp80atbPZAV3WkQ6PQA +zIx/xBx2QnSZDH2tBhK58kzA +-----END PRIVATE KEY----- diff --git a/tests/observability/test_metrics_exporter.cpp b/tests/observability/test_metrics_exporter.cpp new file mode 100644 index 0000000..e11a6a8 --- /dev/null +++ b/tests/observability/test_metrics_exporter.cpp @@ -0,0 +1,281 @@ +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace { + +void fail(const std::string &message) { + std::cerr << "test failure: " << message << std::endl; + std::exit(1); +} + +struct StatsdCapture { + int socket{-1}; + uint16_t port{0}; + std::future future; + std::thread thread; +}; + +StatsdCapture start_statsd_capture() { + StatsdCapture capture; + capture.socket = ::socket(AF_INET, SOCK_DGRAM, 0); + if (capture.socket < 0) { + fail("unable to create statsd socket"); + } + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(0); + if (::bind(capture.socket, reinterpret_cast(&addr), sizeof(addr)) != 0) { + fail("bind statsd socket failed"); + } + sockaddr_in actual{}; + socklen_t len = sizeof(actual); + if (::getsockname(capture.socket, reinterpret_cast(&actual), &len) != 0) { + fail("getsockname statsd socket failed"); + } + capture.port = ntohs(actual.sin_port); + + struct timeval tv {5, 0}; + (void)::setsockopt(capture.socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + std::promise promise; + capture.future = promise.get_future(); + int fd = capture.socket; + capture.thread = std::thread([fd, p = std::move(promise)]() mutable { + char buffer[512]; + ssize_t n = ::recv(fd, buffer, sizeof(buffer) - 1, 0); + if (n > 0) { + buffer[n] = '\0'; + p.set_value(std::string(buffer, static_cast(n))); + } else { + p.set_value(std::string()); + } + }); + return capture; +} + +void stop_statsd_capture(StatsdCapture &capture) { + if (capture.socket >= 0) { + ::close(capture.socket); + capture.socket = -1; + } + if (capture.thread.joinable()) { + capture.thread.join(); + } +} + +struct HttpResponse { + int status{0}; + std::string body; +}; + +HttpResponse https_request(uint16_t port, + const std::string &path, + const std::string &auth_header, + const std::string &ca_path) { + SSL_CTX *ctx = SSL_CTX_new(TLS_client_method()); + if (!ctx) { + fail("unable to create SSL context"); + } + if (SSL_CTX_load_verify_locations(ctx, ca_path.c_str(), nullptr) <= 0) { + fail("unable to load CA"); + } + SSL *ssl = SSL_new(ctx); + if (!ssl) { + fail("unable to allocate SSL"); + } + int fd = ::socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + fail("unable to create tcp socket"); + } + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + if (::inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) != 1) { + fail("inet_pton failed"); + } + if (::connect(fd, reinterpret_cast(&addr), sizeof(addr)) != 0) { + fail("connect failed"); + } + SSL_set_fd(ssl, fd); + SSL_set_tlsext_host_name(ssl, "127.0.0.1"); + if (SSL_connect(ssl) <= 0) { + fail("SSL_connect failed"); + } + std::ostringstream request; + request << "GET " << path << " HTTP/1.1\r\n"; + request << "Host: 127.0.0.1\r\n"; + request << "Connection: close\r\n"; + if (!auth_header.empty()) { + request << "Authorization: " << auth_header << "\r\n"; + } + request << "\r\n"; + std::string req = request.str(); + if (SSL_write(ssl, req.data(), static_cast(req.size())) <= 0) { + fail("SSL_write failed"); + } + std::string response; + char buffer[1024]; + int n = 0; + while ((n = SSL_read(ssl, buffer, sizeof(buffer))) > 0) { + response.append(buffer, n); + } + SSL_shutdown(ssl); + SSL_free(ssl); + ::close(fd); + SSL_CTX_free(ctx); + + HttpResponse parsed; + auto header_end = response.find("\r\n\r\n"); + if (header_end == std::string::npos) { + fail("malformed HTTP response"); + } + std::string headers = response.substr(0, header_end); + parsed.body = response.substr(header_end + 4); + std::istringstream header_stream(headers); + std::string status_line; + std::getline(header_stream, status_line); + if (!status_line.empty() && status_line.back() == '\r') { + status_line.pop_back(); + } + std::istringstream status_stream(status_line); + std::string http_version; + status_stream >> http_version >> parsed.status; + return parsed; +} + +std::string basic_auth_header(const std::string &user, const std::string &pass) { + std::string token = user + ":" + pass; + static const char table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string encoded; + int val = 0; + int valb = -6; + for (unsigned char c : token) { + val = (val << 8) + c; + valb += 8; + while (valb >= 0) { + encoded.push_back(table[(val >> valb) & 0x3F]); + valb -= 6; + } + } + if (valb > -6) { + encoded.push_back(table[((val << 8) >> (valb + 8)) & 0x3F]); + } + while (encoded.size() % 4) { + encoded.push_back('='); + } + return std::string("Basic ") + encoded; +} + +} // namespace + +int main() { + StatsdCapture capture = start_statsd_capture(); + + const char *cert_dir = "../tests/observability/certs/"; + std::string server_crt = std::string(cert_dir) + "server.crt"; + std::string server_key = std::string(cert_dir) + "server.key"; + std::string ca_crt = std::string(cert_dir) + "ca.crt"; + + tsd_metrics_tls_config_t tls{}; + tls.certificate_path = server_crt.c_str(); + tls.private_key_path = server_key.c_str(); + tls.ca_certificate_path = ca_crt.c_str(); + + tsd_metrics_basic_auth_t auth{}; + auth.username = "observer"; + auth.password = "secret"; + + tsd_metrics_exporter_config_t config{}; + config.bind_address = "127.0.0.1"; + config.port = 0; + config.tls = &tls; + config.basic_auth = &auth; + config.statsd_host = "127.0.0.1"; + config.statsd_port = capture.port; + + if (tsd_metrics_exporter_start_with_config(&config) != 0) { + fail("metrics exporter failed to start"); + } + + uint16_t port = tsd_metrics_exporter_listen_port(); + if (port == 0) { + fail("listen port not assigned"); + } + + tsd_controller_telemetry_t controller{}; + controller.fallback_active = 0; + controller.current_width = SIMD_AVX2; + controller.recommended_width = SIMD_AVX2; + controller.issued_change = 0; + tsd_observability_update_controller(&controller); + + tsd_fusion_telemetry_t fusion{}; + fusion.running = 1; + fusion.degraded = 0; + fusion.temp_available = 1; + fusion.package_temp_c = 63.0; + fusion.freq_available = 1; + fusion.freq_ratio = 0.85; + fusion.cpi_available = 1; + fusion.thermal_cpi = 1.10; + fusion.power_available = 1; + fusion.power_budget_w = 75.0; + tsd_observability_update_fusion(&fusion); + + HttpResponse unauth = https_request(port, "/metrics", "", ca_crt); + if (unauth.status != 401) { + fail("expected 401 for missing credentials"); + } + + HttpResponse metrics = https_request(port, "/metrics", basic_auth_header("observer", "secret"), ca_crt); + if (metrics.status != 200) { + fail("expected 200 for metrics"); + } + if (metrics.body.find("tsd_patch_transitions_total") == std::string::npos) { + fail("metrics body missing counter"); + } + + HttpResponse health = https_request(port, "/healthz", basic_auth_header("observer", "secret"), ca_crt); + if (health.status != 200) { + fail("expected healthy response"); + } + HttpResponse ready = https_request(port, "/readyz", basic_auth_header("observer", "secret"), ca_crt); + if (ready.status != 200) { + fail("expected ready response"); + } + + tsd_metrics_exporter_record_patch(SIMD_AVX2, SIMD_AVX512, 0, 5); + + auto status = capture.future.wait_for(std::chrono::seconds(5)); + if (status != std::future_status::ready) { + fail("statsd emission missing"); + } + std::string statsd_payload = capture.future.get(); + if (statsd_payload.find("tsd.patch_transition.avx2.avx512.success") == std::string::npos) { + fail("statsd payload missing transition"); + } + + tsd_metrics_exporter_stop(); + stop_statsd_capture(capture); + return 0; +} +