From 0ecfd47c72397ded3c49865e599f9cefbd479d95 Mon Sep 17 00:00:00 2001 From: SaridakisStamatisChristos <34583142+SaridakisStamatisChristos@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:48:36 +0200 Subject: [PATCH] Add ARX predictive model with reloadable metrics instrumentation --- CMakeLists.txt | 16 ++ README.md | 1 + config/controller_coeffs.json | 9 + docs/controller_coeffs.md | 40 ++++ docs/metrics-endpoints.md | 6 +- docs/predictive-controller.md | 30 ++- docs/sandbox-workflow.md | 2 +- include/thermal/simd/metrics.h | 6 + src/policy/arx_model.cpp | 370 ++++++++++++++++++++++++++++++++ src/policy/arx_model.h | 47 ++++ src/policy/mpc_controller.cpp | 152 ++++++++++++- src/policy/mpc_controller.h | 24 ++- src/runtime_metrics.c | 6 + tests/policy/test_arx_model.cpp | 191 +++++++++++++++++ 14 files changed, 880 insertions(+), 20 deletions(-) create mode 100644 config/controller_coeffs.json create mode 100644 docs/controller_coeffs.md create mode 100644 src/policy/arx_model.cpp create mode 100644 src/policy/arx_model.h create mode 100644 tests/policy/test_arx_model.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6972cc6..fe1c40c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,6 +35,7 @@ set(THERMAL_SIMD_CORE_SOURCES src/thermal_signals.c src/policy/policy_config.c src/policy/dispatcher_policy.cpp + src/policy/arx_model.cpp src/policy/mpc_controller.cpp ) @@ -47,6 +48,10 @@ target_include_directories(thermal_simd_core PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src ) +target_compile_definitions(thermal_simd_core + PRIVATE + TSD_DEFAULT_COEFF_PATH="${CMAKE_CURRENT_SOURCE_DIR}/config/controller_coeffs.json" +) add_executable(thermal_simd src/main.cpp src/thermal_simd.c) target_compile_options(thermal_simd PRIVATE -O2 -pthread -fPIC -mno-avx ${THERMAL_SIMD_DISPATCHER_CPU_FLAGS}) @@ -76,6 +81,10 @@ if(BUILD_TESTING) ${CMAKE_CURRENT_SOURCE_DIR}/src ) target_compile_definitions(thermal_simd_core_tests PRIVATE TSD_ENABLE_TESTS) + target_compile_definitions(thermal_simd_core_tests + PRIVATE + TSD_DEFAULT_COEFF_PATH="${CMAKE_CURRENT_SOURCE_DIR}/config/controller_coeffs.json" + ) add_executable(test_config_parser tests/test_config_parser.c) target_link_libraries(test_config_parser PRIVATE thermal_simd_core_tests) @@ -106,6 +115,13 @@ if(BUILD_TESTING) target_compile_options(test_policy_controller PRIVATE -Wall -Wextra) add_test(NAME policy_controller COMMAND test_policy_controller) + add_executable(test_arx_model tests/policy/test_arx_model.cpp) + target_link_libraries(test_arx_model PRIVATE thermal_simd_core_tests) + target_compile_options(test_arx_model PRIVATE -Wall -Wextra) + target_include_directories(test_arx_model PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src) + target_compile_definitions(test_arx_model PRIVATE TSD_ENABLE_TESTS) + add_test(NAME policy_arx_model COMMAND test_arx_model) + add_executable(test_healthcheck_runtime_flags tests/healthcheck/runtime_flags.cpp) target_link_libraries(test_healthcheck_runtime_flags PRIVATE thermal_simd_core_tests) target_compile_options(test_healthcheck_runtime_flags PRIVATE -Wall -Wextra) diff --git a/README.md b/README.md index 9d9be62..c311e41 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ Sensor dropouts automatically trigger exponential back-off retries and emit logs See dedicated docs for subsystem details: - [Predictive Controller](docs/predictive-controller.md) +- [Controller Coefficient Format](docs/controller_coeffs.md) - [Telemetry Fusion](docs/telemetry-fusion.md) - [Metrics Endpoints](docs/metrics-endpoints.md) - [Sandbox Workflow](docs/sandbox-workflow.md) diff --git a/config/controller_coeffs.json b/config/controller_coeffs.json new file mode 100644 index 0000000..54ae2a8 --- /dev/null +++ b/config/controller_coeffs.json @@ -0,0 +1,9 @@ +{ + "bias": 1200.0, + "ar_temperature": [0.85], + "ratio": [-0.35], + "severity": [0.05], + "trimmed_ratio": [0.0], + "ma": 0.25, + "staleness_window_ms": 750 +} diff --git a/docs/controller_coeffs.md b/docs/controller_coeffs.md new file mode 100644 index 0000000..798592c --- /dev/null +++ b/docs/controller_coeffs.md @@ -0,0 +1,40 @@ +# Controller Coefficient File + +The predictive controller ingests coefficients from `config/controller_coeffs.json` (or the path provided via `--coeff-path`). The file is a JSON object with the following fields: + +| Field | Type | Required | Description | +| --- | --- | --- | --- | +| `bias` | number | Yes | Constant term applied to the forecast (millicelsius). | +| `ar_temperature` | array | Yes | Auto-regressive coefficients applied to historical package temperatures. The array length determines the minimum history window. | +| `ratio` | array | No | Coefficients applied to historical SIMD ratio measurements (milli-units). | +| `trimmed_ratio` | array | No | Coefficients applied to the trimmed ratio (if available). | +| `severity` | array | No | Coefficients applied to the severity metric reported in telemetry (milli-units). | +| `ma` | number | No | Moving-average gain applied to the most recent residual (`actual - forecast`). | +| `staleness_window_ms` | number | No | Maximum age (in milliseconds) of telemetry used for prediction. Defaults to 500 ms. | + +Example: + +```json +{ + "bias": 1200.0, + "ar_temperature": [0.85, 0.05], + "ratio": [-0.30], + "severity": [0.04], + "ma": 0.25, + "staleness_window_ms": 750 +} +``` + +## Hot Reload Workflow + +1. Update the JSON file on disk (e.g., write a new revision into the ConfigMap or local path). +2. Send `SIGHUP` to the dispatcher process. The controller marks a reload for the next control tick. +3. On the following recommendation cycle, the controller attempts to parse the file: + - Success increments `predictive_coeff_reload_total` and logs an INFO entry with the new history window and staleness guard. + - Failure increments `predictive_coeff_reload_errors_total`, logs an ERROR entry, and falls back to the previous coefficients or averaging forecast. + +## Validation Tips + +- Use `tests/policy/test_arx_model.cpp` as a reference for crafting deterministic coefficients during development. +- Monitor `predictive_abs_error_millic_total` to evaluate how well the updated coefficients track observed temperatures. +- Pair coefficient adjustments with updates to [Predictive Controller](predictive-controller.md) documentation to keep operational guidance in sync. diff --git a/docs/metrics-endpoints.md b/docs/metrics-endpoints.md index f3c3ea2..fb316d6 100644 --- a/docs/metrics-endpoints.md +++ b/docs/metrics-endpoints.md @@ -21,7 +21,11 @@ The dispatcher exports metrics and health data via a multi-channel strategy tail | Metric | Type | Description | | --- | --- | --- | | `predictive_forecasts_total` | Counter | Forecast cycles executed by the predictive controller. | -| `predictive_downgrades_total` | Counter | Controller-driven SIMD downgrades. | +| `predictive_decisions_total` | Counter | Control loop iterations that issued a predictive decision. | +| `predictive_abs_error_millic_total` | Counter | Accumulated absolute error between forecast and observed temperature. | +| `predictive_stale_samples_total` | Counter | Telemetry snapshots rejected because they exceeded the staleness window. | +| `predictive_coeff_reload_total` | Counter | Successful coefficient reloads (startup and SIGHUP). | +| `predictive_coeff_reload_errors_total` | Counter | Failed attempts to reload the coefficient file. | | `telemetry_snapshots_total` | Counter | Telemetry fusion snapshots published. | | `telemetry_degraded_total` | Counter | Snapshots flagged as degraded due to missing signals. | | `patch_transitions_total` | Counter | Successful SIMD trampoline swaps. | diff --git a/docs/predictive-controller.md b/docs/predictive-controller.md index bafce8f..241c91f 100644 --- a/docs/predictive-controller.md +++ b/docs/predictive-controller.md @@ -18,17 +18,24 @@ The predictive controller combines reactive thermal throttling with a short-hori Each signal is tagged with a monotonic timestamp. Stale signals (>2 intervals) are discarded and treated as unavailable. ## Forecast Model -The controller uses a single-step ARX model: +The controller uses a single-step ARX/ARMAX model implemented in `src/policy/arx_model.cpp` and driven by coefficients stored in `config/controller_coeffs.json` (see [Controller Coefficients](controller_coeffs.md)). The model consumes a sliding window of recent telemetry samples and projects the next package temperature in millicelsius: ``` -T[t+1] = a0 + a1 * T[t] + a2 * CPI[t] + a3 * Freq[t] + a4 * Power[t] +T[t+1] = bias + + Σ φᵢ · T[t-i] + + Σ θᵢ · Ratio[t-i] + + Σ γᵢ · Severity[t-i] + + ψ · ε[t] ``` -- Coefficients `a1..a4` are calibrated offline using lab traces and stored in `config/controller_coeffs.json`. -- The bias `a0` compensates for ambient temperature. -- Missing inputs zero out their coefficients and raise the `predictive_input_gaps` metric. +- `φᵢ`, `θᵢ`, and `γᵢ` are configurable auto-regressive and exogenous coefficients. +- `ψ` is an optional moving-average gain applied to the most recent residual `ε[t] = T[t] - T̂[t]`. +- Missing temperature samples disable the prediction path and fall back to a simple moving average. +- Coefficient files support hot-reload: the controller listens for `SIGHUP` and re-reads `config/controller_coeffs.json` on the next control tick. Successful reloads and failures are logged and exported via metrics. -The forecast produces a projected temperature and CPI value under the current SIMD width. The controller evaluates transitions (`SSE4.1`, `AVX2`, `AVX-512`) and selects the highest width whose projected temperature remains below `temp_ceiling_c - safety_margin_c` and whose CPI ratio is under `up_ratio`. +Telemetry freshness is enforced prior to forecasting. If the latest sample exceeds the configured `staleness_window_ms`, the controller skips predictive evaluation, logs a warning, and records `predictive_stale_samples_total`. + +The forecast produces a projected temperature under the current SIMD width. The controller evaluates transitions (`SSE4.1`, `AVX2`, `AVX-512`) and selects the highest width whose projected temperature remains below `temp_ceiling_c - safety_margin_c` and whose CPI ratio is under `up_ratio`. ## Decision Pipeline 1. **Acquire Inputs:** Pull the latest telemetry fusion snapshot (all `TelemetrySnapshot` values share a generation number). @@ -52,11 +59,12 @@ The forecast produces a projected temperature and CPI value under the current SI | `--predictive-alpha` | EWMA alpha applied to CPI history. | 0.25 | ## Telemetry & Metrics -- `predictive_forecasts_total`: incremented each control tick. -- `predictive_downgrades_total`: decision to reduce SIMD width due to forecast. -- `predictive_input_gaps_total`: missing telemetry inputs for a tick. -- `predictive_emergency_transitions_total`: emergency scalar fallbacks. -- `predictive_coeff_reload_errors_total`: failure to read coefficients on reload. +- `predictive_forecasts_total`: ARX/ARMAX forecasts executed with valid telemetry. +- `predictive_decisions_total`: control decisions driven by the predictive controller. +- `predictive_abs_error_millic_total`: accumulated absolute prediction error in millicelsius. +- `predictive_stale_samples_total`: telemetry snapshots rejected due to staleness. +- `predictive_coeff_reload_total`: successful coefficient reloads (including on startup). +- `predictive_coeff_reload_errors_total`: failures to read or parse the coefficient file. Metrics are exposed through the metrics subsystem documented in [Metrics Endpoints](metrics-endpoints.md). diff --git a/docs/sandbox-workflow.md b/docs/sandbox-workflow.md index 205e24f..06172ee 100644 --- a/docs/sandbox-workflow.md +++ b/docs/sandbox-workflow.md @@ -52,7 +52,7 @@ This workflow describes how to exercise the dispatcher in a non-production sandb ## Exit Criteria - Dispatcher exits 0. -- `artifacts/*/metrics.ndjson` contains expected counters (`predictive_downgrades_total > 0` during spike scenario). +- `artifacts/*/metrics.ndjson` contains expected counters (`predictive_decisions_total > 0` during spike scenario). - No `state=emergency` logs during nominal runs. ## Automation diff --git a/include/thermal/simd/metrics.h b/include/thermal/simd/metrics.h index 0f57e64..2a24f1e 100644 --- a/include/thermal/simd/metrics.h +++ b/include/thermal/simd/metrics.h @@ -22,6 +22,12 @@ typedef enum { TSD_METRIC_PATCH_FAILURES, TSD_METRIC_HEALTH_CHECK_FAILURES, TSD_METRIC_SOFTWARE_TIMEOUT_ESCALATIONS, + TSD_METRIC_PREDICTIVE_FORECASTS, + TSD_METRIC_PREDICTIVE_STALE_SAMPLES, + TSD_METRIC_PREDICTIVE_RELOADS, + TSD_METRIC_PREDICTIVE_RELOAD_ERRORS, + TSD_METRIC_PREDICTIVE_ABS_ERROR_MILLIC, + TSD_METRIC_PREDICTIVE_DECISIONS, TSD_METRIC_COUNT } tsd_metric_counter_t; diff --git a/src/policy/arx_model.cpp b/src/policy/arx_model.cpp new file mode 100644 index 0000000..0801fbb --- /dev/null +++ b/src/policy/arx_model.cpp @@ -0,0 +1,370 @@ +#include "arx_model.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mpc_controller.h" + +namespace tsd { +namespace policy { + +namespace { + +bool extractNumberToken(const std::string &content, std::size_t &pos, std::string *token) { + std::size_t start = pos; + while (pos < content.size()) { + char ch = content[pos]; + if (std::isdigit(static_cast(ch)) || ch == '-' || ch == '+' || ch == '.' || ch == 'e' || + ch == 'E') { + ++pos; + continue; + } + break; + } + if (pos == start) { + return false; + } + if (token) { + *token = content.substr(start, pos - start); + } + return true; +} + +bool parseDoubleField(const std::string &content, + const std::string &key, + bool required, + double *out, + bool *found, + std::string *error_out) { + if (!out) { + if (error_out) { + *error_out = "missing output pointer"; + } + return false; + } + std::string needle = "\"" + key + "\""; + std::size_t pos = content.find(needle); + if (pos == std::string::npos) { + if (found) { + *found = false; + } + if (required && error_out) { + *error_out = "missing field: " + key; + } + return !required; + } + if (found) { + *found = true; + } + pos = content.find(':', pos + needle.size()); + if (pos == std::string::npos) { + if (error_out) { + *error_out = "malformed field: " + key; + } + return false; + } + ++pos; + while (pos < content.size() && std::isspace(static_cast(content[pos]))) { + ++pos; + } + std::string token; + if (!extractNumberToken(content, pos, &token)) { + if (error_out) { + *error_out = "invalid numeric value for field: " + key; + } + return false; + } + char *endptr = nullptr; + errno = 0; + double value = std::strtod(token.c_str(), &endptr); + if (errno != 0 || !endptr || *endptr != '\0') { + if (error_out) { + *error_out = "failed to parse double for field: " + key; + } + return false; + } + *out = value; + return true; +} + +bool parseUint64Field(const std::string &content, + const std::string &key, + bool required, + std::uint64_t *out, + bool *found, + std::string *error_out) { + double value = 0.0; + bool local_found = false; + if (!parseDoubleField(content, key, required, &value, &local_found, error_out)) { + return false; + } + if (!local_found) { + if (found) { + *found = false; + } + return !required; + } + if (found) { + *found = true; + } + if (value < 0.0) { + if (error_out) { + *error_out = "negative value for field: " + key; + } + return false; + } + *out = static_cast(value + 0.5); + return true; +} + +bool parseDoubleArrayField(const std::string &content, const std::string &key, std::vector *out, + std::string *error_out) { + if (!out) { + if (error_out) { + *error_out = "missing output pointer"; + } + return false; + } + std::string needle = "\"" + key + "\""; + std::size_t pos = content.find(needle); + if (pos == std::string::npos) { + out->clear(); + return true; + } + pos = content.find('[', pos + needle.size()); + if (pos == std::string::npos) { + if (error_out) { + *error_out = "malformed array for field: " + key; + } + return false; + } + ++pos; + out->clear(); + while (pos < content.size()) { + while (pos < content.size() && std::isspace(static_cast(content[pos]))) { + ++pos; + } + if (pos >= content.size()) { + break; + } + if (content[pos] == ']') { + ++pos; + return true; + } + std::size_t start = pos; + while (pos < content.size() && content[pos] != ',' && content[pos] != ']') { + ++pos; + } + std::string token = content.substr(start, pos - start); + std::size_t trimmed_start = token.find_first_not_of(" \t\n\r"); + std::size_t trimmed_end = token.find_last_not_of(" \t\n\r"); + if (trimmed_start == std::string::npos) { + if (error_out) { + *error_out = "empty entry in array for field: " + key; + } + return false; + } + token = token.substr(trimmed_start, trimmed_end - trimmed_start + 1); + char *endptr = nullptr; + errno = 0; + double value = std::strtod(token.c_str(), &endptr); + if (errno != 0 || !endptr || *endptr != '\0') { + if (error_out) { + *error_out = "invalid numeric entry in array for field: " + key; + } + return false; + } + out->push_back(value); + if (pos < content.size() && content[pos] == ',') { + ++pos; + } + } + if (error_out) { + *error_out = "unterminated array for field: " + key; + } + return false; +} + +} // namespace + +ARXModel::ARXModel() + : bias_(0.0), + ma_coefficient_(0.0), + ma_enabled_(false), + last_residual_(0.0), + residual_valid_(false), + coefficients_loaded_(false), + required_history_(1), + staleness_window_ms_(500) {} + +bool ARXModel::loadFromFile(const std::string &path, std::string *error_out) { + std::ifstream stream(path); + if (!stream.is_open()) { + if (error_out) { + *error_out = "unable to open coefficient file: " + path; + } + coefficients_loaded_ = false; + return false; + } + std::ostringstream buffer; + buffer << stream.rdbuf(); + if (buffer.fail()) { + if (error_out) { + *error_out = "failed to read coefficient file: " + path; + } + coefficients_loaded_ = false; + return false; + } + std::string content = buffer.str(); + if (!parseContent(content, error_out)) { + coefficients_loaded_ = false; + return false; + } + coefficients_loaded_ = true; + residual_valid_ = false; + required_history_ = std::max(1, temperature_coeffs_.size()); + required_history_ = std::max(required_history_, ratio_coeffs_.size()); + required_history_ = std::max(required_history_, severity_coeffs_.size()); + required_history_ = std::max(required_history_, trimmed_ratio_coeffs_.size()); + return true; +} + +bool ARXModel::parseContent(const std::string &content, std::string *error_out) { + double bias = 0.0; + bool bias_found = false; + if (!parseDoubleField(content, "bias", true, &bias, &bias_found, error_out) || !bias_found) { + return false; + } + + std::vector temp_coeffs; + if (!parseDoubleArrayField(content, "ar_temperature", &temp_coeffs, error_out)) { + return false; + } + if (temp_coeffs.empty()) { + if (error_out) { + *error_out = "ar_temperature array must contain at least one coefficient"; + } + return false; + } + + std::vector ratio_coeffs; + if (!parseDoubleArrayField(content, "ratio", &ratio_coeffs, error_out)) { + return false; + } + + std::vector severity_coeffs; + if (!parseDoubleArrayField(content, "severity", &severity_coeffs, error_out)) { + return false; + } + + std::vector trimmed_ratio_coeffs; + if (!parseDoubleArrayField(content, "trimmed_ratio", &trimmed_ratio_coeffs, error_out)) { + return false; + } + + double ma_coeff = 0.0; + bool ma_present = false; + if (!parseDoubleField(content, "ma", false, &ma_coeff, &ma_present, error_out)) { + return false; + } + + std::uint64_t staleness_window = 0; + bool staleness_found = false; + if (!parseUint64Field(content, "staleness_window_ms", false, &staleness_window, &staleness_found, error_out)) { + return false; + } + if (!staleness_found) { + staleness_window = 500; + } + if (staleness_window == 0) { + staleness_window = 1; + } + + bias_ = bias; + temperature_coeffs_ = std::move(temp_coeffs); + ratio_coeffs_ = std::move(ratio_coeffs); + severity_coeffs_ = std::move(severity_coeffs); + trimmed_ratio_coeffs_ = std::move(trimmed_ratio_coeffs); + ma_coefficient_ = ma_coeff; + ma_enabled_ = ma_present; + staleness_window_ms_ = staleness_window; + return true; +} + +double ARXModel::predict(const std::deque &history, bool *ok) const { + if (!coefficients_loaded_) { + if (ok) { + *ok = false; + } + return 0.0; + } + if (history.empty()) { + if (ok) { + *ok = false; + } + return 0.0; + } + std::size_t size = history.size(); + double prediction = bias_; + bool used_temperature = false; + + for (std::size_t i = 0; i < temperature_coeffs_.size() && i < size; ++i) { + const TelemetrySample &sample = history[size - 1 - i]; + if (!sample.temp_valid) { + continue; + } + prediction += temperature_coeffs_[i] * sample.temperature_millic; + used_temperature = true; + } + + for (std::size_t i = 0; i < ratio_coeffs_.size() && i < size; ++i) { + const TelemetrySample &sample = history[size - 1 - i]; + double ratio = sample.trimmed_ratio_milli > 0.0 ? sample.trimmed_ratio_milli : sample.ratio_milli; + prediction += ratio_coeffs_[i] * ratio; + } + + for (std::size_t i = 0; i < severity_coeffs_.size() && i < size; ++i) { + const TelemetrySample &sample = history[size - 1 - i]; + prediction += severity_coeffs_[i] * sample.severity_milli; + } + + for (std::size_t i = 0; i < trimmed_ratio_coeffs_.size() && i < size; ++i) { + const TelemetrySample &sample = history[size - 1 - i]; + prediction += trimmed_ratio_coeffs_[i] * sample.trimmed_ratio_milli; + } + + if (!used_temperature) { + if (ok) { + *ok = false; + } + return 0.0; + } + + if (ma_enabled_ && residual_valid_) { + prediction += ma_coefficient_ * last_residual_; + } + + if (ok) { + *ok = true; + } + return prediction; +} + +void ARXModel::updateResidual(double residual) { + last_residual_ = residual; + residual_valid_ = true; +} + +void ARXModel::resetResidual() { + last_residual_ = 0.0; + residual_valid_ = false; +} + +} // namespace policy +} // namespace tsd diff --git a/src/policy/arx_model.h b/src/policy/arx_model.h new file mode 100644 index 0000000..b3e0f1b --- /dev/null +++ b/src/policy/arx_model.h @@ -0,0 +1,47 @@ +#ifndef TSD_POLICY_ARX_MODEL_H +#define TSD_POLICY_ARX_MODEL_H + +#include +#include +#include +#include + +namespace tsd { +namespace policy { + +struct TelemetrySample; + +class ARXModel { +public: + ARXModel(); + + bool loadFromFile(const std::string &path, std::string *error_out); + double predict(const std::deque &history, bool *ok) const; + void updateResidual(double residual); + void resetResidual(); + + bool valid() const { return coefficients_loaded_; } + std::size_t requiredHistory() const { return required_history_; } + std::uint64_t stalenessWindowMs() const { return staleness_window_ms_; } + +private: + bool parseContent(const std::string &content, std::string *error_out); + + double bias_; + std::vector temperature_coeffs_; + std::vector ratio_coeffs_; + std::vector severity_coeffs_; + std::vector trimmed_ratio_coeffs_; + double ma_coefficient_; + bool ma_enabled_; + double last_residual_; + bool residual_valid_; + bool coefficients_loaded_; + std::size_t required_history_; + std::uint64_t staleness_window_ms_; +}; + +} // namespace policy +} // namespace tsd + +#endif // TSD_POLICY_ARX_MODEL_H diff --git a/src/policy/mpc_controller.cpp b/src/policy/mpc_controller.cpp index 39d29a9..aad840a 100644 --- a/src/policy/mpc_controller.cpp +++ b/src/policy/mpc_controller.cpp @@ -1,10 +1,17 @@ #include "mpc_controller.h" #include +#include +#include #include #include +#include #include #include +#include + +#include +#include namespace tsd { namespace policy { @@ -14,29 +21,84 @@ constexpr double kMinImprovement = 1.0; constexpr double kRatioTrendWeight = 0.5; constexpr double kTemperatureWeight = 0.001; constexpr double kStabilityMargin = 0.25; +constexpr std::chrono::milliseconds kDefaultStalenessWindow{500}; + +std::atomic &reloadRequestedFlag() { + static std::atomic flag{false}; + return flag; +} + +void handleSighup(int) { + reloadRequestedFlag().store(true, std::memory_order_relaxed); +} + +struct SignalRegistrar { + SignalRegistrar() { + std::signal(SIGHUP, handleSighup); + } +}; + +SignalRegistrar ®istrar() { + static SignalRegistrar instance; + return instance; +} inline int widthIndex(simd_width_t width) { return static_cast(width); } +std::string resolveCoefficientPath() { + (void)registrar(); + const char *env = std::getenv("TSD_PREDICTIVE_COEFF_PATH"); + if (env && *env) { + return std::string(env); + } +#ifdef TSD_DEFAULT_COEFF_PATH + return std::string(TSD_DEFAULT_COEFF_PATH); +#else + return std::string("config/controller_coeffs.json"); +#endif +} + } // namespace MPCController::MPCController() { + coeff_path_ = resolveCoefficientPath(); tsd_policy_config defaults; tsd_policy_config_set_defaults(&defaults); reset(defaults); } MPCController::MPCController(const tsd_policy_config &config) { + coeff_path_ = resolveCoefficientPath(); reset(config); } void MPCController::reset(const tsd_policy_config &config) { config_ = config; history_.clear(); + arx_model_.resetResidual(); + staleness_window_ = kDefaultStalenessWindow; + last_prediction_millic_ = 0.0; + last_prediction_valid_ = false; + history_limit_ = std::max(1, config_.forecast_horizon); + if (!loadCoefficients(true)) { + tsd_log_warn("policy", "using fallback averaging forecast due to coefficient load failure"); + } } void MPCController::pushSample(const tsd_thermal_eval_t &sample, simd_width_t width) { + if (last_prediction_valid_ && sample.temp_available) { + double residual = static_cast(sample.package_temp_millic) - last_prediction_millic_; + arx_model_.updateResidual(residual); + double abs_residual = std::fabs(residual); + std::uint64_t scaled = static_cast(abs_residual + 0.5); + tsd_metrics_add(TSD_METRIC_PREDICTIVE_ABS_ERROR_MILLIC, scaled); + tsd_log_debug("policy", "forecast residual=%.2f (actual=%d predicted=%.2f)", residual, + sample.package_temp_millic, last_prediction_millic_); + } + last_prediction_valid_ = false; + TelemetrySample entry{}; entry.ratio_milli = static_cast(sample.ratio_milli); entry.trimmed_ratio_milli = static_cast(sample.trimmed_ratio_milli); @@ -44,9 +106,11 @@ void MPCController::pushSample(const tsd_thermal_eval_t &sample, simd_width_t wi entry.temperature_millic = static_cast(sample.package_temp_millic); entry.temp_valid = sample.temp_available != 0; entry.width = width; + entry.timestamp = std::chrono::steady_clock::now(); history_.push_back(entry); - if (config_.forecast_horizon > 0) { - while (history_.size() > static_cast(config_.forecast_horizon)) { + size_t limit = historyLimit(); + if (limit > 0) { + while (history_.size() > limit) { history_.pop_front(); } } @@ -66,11 +130,22 @@ double MPCController::computeForecastRatio(size_t horizon) const { return sum / static_cast(count); } -double MPCController::computeForecastTemperature(size_t horizon, size_t &valid_count) const { +double MPCController::computeForecastTemperature(size_t horizon, size_t &valid_count, bool &used_model) const { if (history_.empty() || horizon == 0) { valid_count = 0; + used_model = false; return 0.0; } + used_model = false; + if (arx_model_.valid()) { + bool ok = false; + double prediction = arx_model_.predict(history_, &ok); + if (ok) { + valid_count = 1; + used_model = true; + return prediction; + } + } double sum = 0.0; valid_count = 0; size_t count = std::min(history_.size(), horizon); @@ -124,15 +199,28 @@ double MPCController::scoreWidth(simd_width_t candidate, bool MPCController::recommend(simd_width_t current_width, simd_width_t max_width, - simd_width_t &out_width) const { + simd_width_t &out_width) { if (history_.empty() || config_.forecast_horizon == 0) { return false; } + maybeReloadCoefficients(); + size_t horizon = std::min(history_.size(), static_cast(config_.forecast_horizon)); if (horizon == 0) { return false; } + auto now = std::chrono::steady_clock::now(); + const TelemetrySample &latest = history_.back(); + auto elapsed = std::chrono::duration_cast(now - latest.timestamp); + if (elapsed > staleness_window_) { + tsd_metrics_increment(TSD_METRIC_PREDICTIVE_STALE_SAMPLES); + tsd_log_warn("policy", "telemetry stale (%lld ms > %lld ms)", + static_cast(elapsed.count()), + static_cast(staleness_window_.count())); + return false; + } + double forecast_ratio = computeForecastRatio(horizon); double first_ratio = history_.front().ratio_milli; double last_ratio = history_.back().ratio_milli; @@ -146,7 +234,13 @@ bool MPCController::recommend(simd_width_t current_width, double ratio_error = forecast_ratio - static_cast(config_.slo_ratio_milli); size_t temp_valid_count = 0; - double forecast_temp = computeForecastTemperature(horizon, temp_valid_count); + bool used_model = false; + double forecast_temp = computeForecastTemperature(horizon, temp_valid_count, used_model); + if (used_model) { + tsd_metrics_increment(TSD_METRIC_PREDICTIVE_FORECASTS); + } + last_prediction_millic_ = forecast_temp; + last_prediction_valid_ = used_model; double best_cost = std::numeric_limits::infinity(); simd_width_t best_width = current_width; @@ -180,9 +274,57 @@ bool MPCController::recommend(simd_width_t current_width, return false; } + tsd_log_info("policy", "decision current=%d target=%d forecast_ratio=%.2f forecast_temp=%.2f", widthIndex(current_width), + widthIndex(best_width), forecast_ratio, forecast_temp); + tsd_metrics_increment(TSD_METRIC_PREDICTIVE_DECISIONS); + out_width = best_width; return true; } +bool MPCController::loadCoefficients(bool log_success) { + std::string error; + if (!arx_model_.loadFromFile(coeff_path_, &error)) { + tsd_log_error("policy", "failed to load coefficients from %s: %s", coeff_path_.c_str(), error.c_str()); + tsd_metrics_increment(TSD_METRIC_PREDICTIVE_RELOAD_ERRORS); + staleness_window_ = kDefaultStalenessWindow; + history_limit_ = std::max(1, config_.forecast_horizon); + return false; + } + long long window_ms = static_cast(arx_model_.stalenessWindowMs()); + if (window_ms <= 0) { + window_ms = kDefaultStalenessWindow.count(); + } + staleness_window_ = std::chrono::milliseconds(window_ms); + history_limit_ = std::max(std::max(1, config_.forecast_horizon), arx_model_.requiredHistory()); + arx_model_.resetResidual(); + tsd_metrics_increment(TSD_METRIC_PREDICTIVE_RELOADS); + if (log_success) { + tsd_log_info("policy", "loaded coefficients from %s (staleness=%lld ms, history=%zu)", coeff_path_.c_str(), + static_cast(staleness_window_.count()), history_limit_); + } + return true; +} + +void MPCController::maybeReloadCoefficients() { + if (reloadRequestedFlag().exchange(false, std::memory_order_relaxed)) { + loadCoefficients(false); + } +} + +size_t MPCController::historyLimit() const { + if (history_limit_ == 0) { + return std::max(1, config_.forecast_horizon); + } + return history_limit_; +} + +#ifdef TSD_ENABLE_TESTS +void MPCController::debugSetCoefficientPath(const std::string &path) { + coeff_path_ = path; + loadCoefficients(false); +} +#endif + } // namespace policy } // namespace tsd diff --git a/src/policy/mpc_controller.h b/src/policy/mpc_controller.h index 0972a93..1f110fa 100644 --- a/src/policy/mpc_controller.h +++ b/src/policy/mpc_controller.h @@ -1,12 +1,16 @@ #ifndef TSD_POLICY_MPC_CONTROLLER_H #define TSD_POLICY_MPC_CONTROLLER_H +#include #include +#include #include #include #include +#include "arx_model.h" + namespace tsd { namespace policy { @@ -17,6 +21,7 @@ struct TelemetrySample { double temperature_millic; bool temp_valid; simd_width_t width; + std::chrono::steady_clock::time_point timestamp; }; class MPCController { @@ -28,12 +33,20 @@ class MPCController { void pushSample(const tsd_thermal_eval_t &sample, simd_width_t width); bool recommend(simd_width_t current_width, simd_width_t max_width, - simd_width_t &out_width) const; + simd_width_t &out_width); size_t sampleCount() const { return history_.size(); } +#ifdef TSD_ENABLE_TESTS + void debugSetCoefficientPath(const std::string &path); + double debugLastPrediction() const { return last_prediction_millic_; } + bool debugPredictionValid() const { return last_prediction_valid_; } +#endif + private: + bool loadCoefficients(bool log_success); + void maybeReloadCoefficients(); double computeForecastRatio(size_t horizon) const; - double computeForecastTemperature(size_t horizon, size_t &valid_count) const; + double computeForecastTemperature(size_t horizon, size_t &valid_count, bool &used_model) const; double scoreWidth(simd_width_t candidate, simd_width_t current, size_t horizon, @@ -41,9 +54,16 @@ class MPCController { double ratio_trend, double forecast_temp, double ratio_error) const; + size_t historyLimit() const; tsd_policy_config config_{}; std::deque history_; + std::string coeff_path_; + ARXModel arx_model_; + std::chrono::milliseconds staleness_window_; + double last_prediction_millic_; + bool last_prediction_valid_; + size_t history_limit_; }; } // namespace policy diff --git a/src/runtime_metrics.c b/src/runtime_metrics.c index 31ce988..65fe5cf 100644 --- a/src/runtime_metrics.c +++ b/src/runtime_metrics.c @@ -44,6 +44,12 @@ const char* tsd_metrics_counter_name(tsd_metric_counter_t id) { case TSD_METRIC_PATCH_FAILURES: return "patch_failures"; case TSD_METRIC_HEALTH_CHECK_FAILURES: return "health_check_failures"; case TSD_METRIC_SOFTWARE_TIMEOUT_ESCALATIONS: return "software_timeout_escalations"; + case TSD_METRIC_PREDICTIVE_FORECASTS: return "predictive_forecasts_total"; + case TSD_METRIC_PREDICTIVE_STALE_SAMPLES: return "predictive_stale_samples_total"; + case TSD_METRIC_PREDICTIVE_RELOADS: return "predictive_coeff_reload_total"; + case TSD_METRIC_PREDICTIVE_RELOAD_ERRORS: return "predictive_coeff_reload_errors_total"; + case TSD_METRIC_PREDICTIVE_ABS_ERROR_MILLIC: return "predictive_abs_error_millic_total"; + case TSD_METRIC_PREDICTIVE_DECISIONS: return "predictive_decisions_total"; case TSD_METRIC_COUNT: return "invalid"; } return "invalid"; diff --git a/tests/policy/test_arx_model.cpp b/tests/policy/test_arx_model.cpp new file mode 100644 index 0000000..aa5a534 --- /dev/null +++ b/tests/policy/test_arx_model.cpp @@ -0,0 +1,191 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "policy/arx_model.h" +#include "policy/mpc_controller.h" + +using tsd::policy::ARXModel; +using tsd::policy::MPCController; +using tsd::policy::TelemetrySample; + +namespace { + +std::filesystem::path writeCoefficients(const std::string &name, const std::string &content) { + auto dir = std::filesystem::temp_directory_path() / "tsd_policy_tests"; + std::filesystem::create_directories(dir); + auto path = dir / name; + std::ofstream stream(path); + stream << content; + stream.close(); + return path; +} + +void populateSample(tsd_thermal_eval_t &sample, uint32_t ratio, int32_t temp_millic) { + sample = tsd_thermal_eval_t{}; + sample.ratio_milli = ratio; + sample.trimmed_ratio_milli = ratio; + sample.severity_milli = ratio; + sample.thermal_severity_milli = ratio; + sample.package_temp_millic = temp_millic; + sample.temp_available = 1; +} + +void test_arx_prediction_basic() { + auto path = writeCoefficients( + "coeff_basic.json", + R"JSON({ + "bias": 100.0, + "ar_temperature": [0.5, 0.1], + "ratio": [0.01], + "severity": [0.02], + "ma": 0.0, + "staleness_window_ms": 250 +})JSON"); + + ARXModel model; + std::string error; + bool ok = model.loadFromFile(path.string(), &error); + assert(ok); + (void)error; + + std::deque history; + TelemetrySample first{}; + first.temperature_millic = 80250.0; + first.temp_valid = true; + first.ratio_milli = 1450.0; + first.trimmed_ratio_milli = 0.0; + first.severity_milli = 1250.0; + first.width = SIMD_AVX2; + first.timestamp = std::chrono::steady_clock::now(); + history.push_back(first); + + TelemetrySample second = first; + second.temperature_millic = 80600.0; + second.ratio_milli = 1500.0; + second.trimmed_ratio_milli = 0.0; + second.severity_milli = 1300.0; + second.timestamp = std::chrono::steady_clock::now(); + history.push_back(second); + + bool prediction_ok = false; + double prediction = model.predict(history, &prediction_ok); + assert(prediction_ok); + + double expected = 100.0 + 0.5 * second.temperature_millic + 0.1 * first.temperature_millic + + 0.01 * second.ratio_milli + 0.02 * second.severity_milli; + assert(std::fabs(prediction - expected) < 1e-3); + assert(model.stalenessWindowMs() == 250); +} + +void test_mpc_staleness_guard() { + auto path = writeCoefficients( + "coeff_stale.json", + R"JSON({ + "bias": 0.0, + "ar_temperature": [1.0], + "ratio": [0.0], + "severity": [0.0], + "ma": 0.0, + "staleness_window_ms": 1 +})JSON"); + + MPCController controller; + tsd_policy_config cfg; + tsd_policy_config_set_defaults(&cfg); + cfg.forecast_horizon = 2; + controller.reset(cfg); + controller.debugSetCoefficientPath(path.string()); + + tsd_metrics_snapshot_t before; + tsd_metrics_snapshot(&before); + + tsd_thermal_eval_t sample{}; + populateSample(sample, 1400, 80000); + controller.pushSample(sample, SIMD_AVX2); + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + + simd_width_t target = SIMD_AVX2; + bool changed = controller.recommend(SIMD_AVX2, SIMD_AVX512, target); + assert(!changed); + assert(target == SIMD_AVX2); + + tsd_metrics_snapshot_t after; + tsd_metrics_snapshot(&after); + uint64_t stale_delta = after.counters[TSD_METRIC_PREDICTIVE_STALE_SAMPLES] - + before.counters[TSD_METRIC_PREDICTIVE_STALE_SAMPLES]; + assert(stale_delta == 1); +} + +void test_mpc_reload_on_sighup() { + auto base_path = writeCoefficients( + "coeff_reload.json", + R"JSON({ + "bias": 0.0, + "ar_temperature": [0.5], + "ratio": [0.0], + "severity": [0.0], + "ma": 0.0, + "staleness_window_ms": 100 +})JSON"); + + MPCController controller; + tsd_policy_config cfg; + tsd_policy_config_set_defaults(&cfg); + cfg.forecast_horizon = 3; + controller.reset(cfg); + controller.debugSetCoefficientPath(base_path.string()); + + tsd_thermal_eval_t sample{}; + populateSample(sample, 1800, 82000); + controller.pushSample(sample, SIMD_AVX2); + populateSample(sample, 1700, 81500); + controller.pushSample(sample, SIMD_AVX2); + + simd_width_t target = SIMD_AVX2; + controller.recommend(SIMD_AVX2, SIMD_AVX512, target); + double initial_prediction = controller.debugLastPrediction(); + + // Overwrite coefficients with a different bias and stronger gain. + std::ofstream stream(base_path); + stream << R"JSON({ + "bias": 5000.0, + "ar_temperature": [0.9], + "ratio": [0.0], + "severity": [0.0], + "ma": 0.0, + "staleness_window_ms": 100 +})JSON"; + stream.close(); + + std::raise(SIGHUP); + + populateSample(sample, 1650, 81000); + controller.pushSample(sample, SIMD_AVX2); + target = SIMD_AVX2; + controller.recommend(SIMD_AVX2, SIMD_AVX512, target); + double reloaded_prediction = controller.debugLastPrediction(); + + assert(reloaded_prediction > initial_prediction + 1000.0); +} + +} // namespace + +int main() { + test_arx_prediction_basic(); + test_mpc_staleness_guard(); + test_mpc_reload_on_sighup(); + std::printf("policy ARX model tests passed\n"); + return 0; +}