Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ set(THERMAL_SIMD_CORE_SOURCES
src/telemetry/evaluator.cpp
src/telemetry/history_store.cpp
src/telemetry/sensors.cpp
src/telemetry/bus.cpp
src/telemetry/collector.cpp
src/telemetry/fusion.cpp
src/telemetry/fusion_bridge.cpp
src/observability/metrics.cpp
src/thermal_config.c
src/thermal_cpu.c
Expand Down Expand Up @@ -132,6 +136,16 @@ if(BUILD_TESTING)
target_compile_options(test_telemetry PRIVATE -Wall -Wextra)
add_test(NAME telemetry_adapters COMMAND test_telemetry)

add_executable(test_telemetry_fusion tests/telemetry/test_fusion_thread.cpp)
target_link_libraries(test_telemetry_fusion PRIVATE thermal_simd_core_tests pthread)
target_compile_options(test_telemetry_fusion PRIVATE -Wall -Wextra -pthread)
add_test(NAME telemetry_fusion_thread COMMAND test_telemetry_fusion)

add_executable(test_telemetry_fusion_stress tests/telemetry/test_fusion_stress.cpp)
target_link_libraries(test_telemetry_fusion_stress PRIVATE thermal_simd_core_tests pthread)
target_compile_options(test_telemetry_fusion_stress PRIVATE -Wall -Wextra -pthread)
add_test(NAME telemetry_fusion_stress COMMAND test_telemetry_fusion_stress)

add_executable(test_trampoline_security tests/patcher/test_trampoline_security.cpp)
target_link_libraries(test_trampoline_security PRIVATE thermal_simd_core_tests pthread)
target_compile_definitions(test_trampoline_security PRIVATE TSD_ENABLE_TESTS)
Expand Down
78 changes: 78 additions & 0 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Telemetry Subsystem Overview

The telemetry subsystem normalizes hardware sensor readings, fuses them into
consistent snapshots, and publishes those frames to the predictive controller.
This document describes the new collector abstractions, the telemetry bus
manager, and the fusion thread that feeds controller consumers.

## Module Layout

The subsystem lives under `src/telemetry/` with public headers in
`include/telemetry/`:

| Path | Description |
| ---- | ----------- |
| `include/telemetry/bus.h` | Shared `TelemetryBus` state and manager facade. |
| `include/telemetry/collector.h` | Collector interfaces plus concrete collectors for perf, MSR, RAPL, frequency, and OEM sources. |
| `include/telemetry/fusion.h` | Ring buffer, fusion thread, and snapshot types. |
| `src/telemetry/bus.cpp` | `TelemetryBus` and manager implementation. |
| `src/telemetry/collector.cpp` | Periodic collector helpers and concrete collectors. |
| `src/telemetry/fusion.cpp` | Fusion thread, snapshot ring buffer, and freshness logic. |
| `src/telemetry/fusion_bridge.cpp` | C bridge that exposes fusion outputs to the legacy runtime. |

The bridge exports the `tsd_telemetry_fusion_*` C helpers declared in
`include/thermal/simd/telemetry_fusion.h` so C modules can start/stop the fusion
thread and fetch fused telemetry samples.

## Collectors & Bus Manager

Each collector implements the `TelemetryCollector` interface and publishes
`TelemetryReading` values onto the shared `TelemetryBus`:

* `PerfCollector` – provides CPI and frequency ratio hints derived from perf
counters.
* `MsrCollector` – ingests package/core temperatures from IA32 thermal MSRs.
* `RaplCollector` – translates RAPL energy deltas into instantaneous power
budgets.
* `FreqCollector` – polls `/proc/cpuinfo_cur_freq` style interfaces for turbo
residency hints.
* `OemCollector` – optionally sources OEM/PMBus temperatures with higher
quality scores.

`TelemetryBusManager` owns the collector list and coordinates polling on the
fusion thread. Collectors are `PeriodicCollector` derivatives that enforce their
own polling cadence while protecting the bus from concurrent updates.

## Fusion Thread & Snapshots

`TelemetryFusion` owns a background thread that polls registered collectors,
aggregates their latest readings, and emits immutable `TelemetrySnapshot`
instances into a lock-protected ring buffer. Freshness is enforced with a
configurable window (`TelemetryFusionConfig::freshness_window`); any signal that
misses the window is marked unavailable and the snapshot is flagged as
`degraded` so the controller can fall back.

Consumers can either call `latest_snapshot()` for a non-blocking read or
`wait_for_snapshot()` to block on a minimum generation. The ring buffer size is
configurable to cover fast polling cadences without blocking producers.

## Controller Integration

`tsd_perf_init()` now starts the fusion thread via
`tsd_telemetry_fusion_start()`, and `tsd_perf_cleanup()` stops it. During perf
evaluations the runtime first tries `tsd_telemetry_fusion_sample()` to obtain a
fused frame before falling back to the legacy synchronous helper. This keeps the
predictive controller aligned with fused telemetry while preserving existing
fallback behaviour when collectors are unavailable.

## Testing

Two new test binaries exercise the subsystem:

* `test_telemetry_fusion` – integration test that verifies collectors publish to
the bus, snapshots respect freshness, and degradation flags propagate.
* `test_telemetry_fusion_stress` – stress harness that ensures the ring buffer
and fusion thread remain responsive over rapid polling cycles.

Both tests live under `tests/telemetry/` and are wired into CTest.

59 changes: 59 additions & 0 deletions include/telemetry/bus.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <vector>

namespace telemetry {

enum class TelemetrySignal {
kPackageTempC,
kFrequencyRatio,
kThermalCpi,
kPowerBudgetWatts,
};

struct TelemetryReading {
double value = 0.0;
bool valid = false;
int quality = 0;
std::chrono::steady_clock::time_point timestamp{};
};

class TelemetryBus {
public:
TelemetryBus();

void publish(TelemetrySignal signal, const TelemetryReading &reading);
std::optional<TelemetryReading> latest(TelemetrySignal signal) const;

private:
using ReadingMap = std::unordered_map<TelemetrySignal, TelemetryReading>;

mutable std::mutex mutex_;
ReadingMap readings_;
};

class TelemetryCollector;

class TelemetryBusManager {
public:
TelemetryBusManager();

void set_bus(std::shared_ptr<TelemetryBus> bus);
std::shared_ptr<TelemetryBus> bus() const;

void add_collector(std::shared_ptr<TelemetryCollector> collector);
std::vector<std::shared_ptr<TelemetryCollector>> collectors() const;
void poll(std::chrono::steady_clock::time_point now);

private:
std::shared_ptr<TelemetryBus> bus_;
std::vector<std::shared_ptr<TelemetryCollector>> collectors_;
};

} // namespace telemetry

121 changes: 121 additions & 0 deletions include/telemetry/collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#pragma once

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include <telemetry/bus.h>

namespace telemetry {

struct PerfSample {
double thermal_cpi = 0.0;
double freq_hint = 0.0;
bool valid = false;
};

struct TemperatureSample {
double package_temp_c = 0.0;
bool valid = false;
};

struct RaplSample {
double power_budget_w = 0.0;
bool valid = false;
};

using PerfSampleProvider = std::function<PerfSample()>;
using TemperatureSampleProvider = std::function<TemperatureSample()>;
using RaplSampleProvider = std::function<RaplSample()>;

class TelemetryCollector {
public:
virtual ~TelemetryCollector() = default;

virtual std::string name() const = 0;
virtual void collect(TelemetryBus &bus, std::chrono::steady_clock::time_point now) = 0;
};

class PeriodicCollector : public TelemetryCollector {
public:
PeriodicCollector(std::string name, std::chrono::milliseconds interval);

std::string name() const override;
void collect(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

protected:
virtual void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) = 0;

private:
std::string name_;
std::chrono::milliseconds interval_;
std::chrono::steady_clock::time_point next_run_;
};

class PerfCollector : public PeriodicCollector {
public:
PerfCollector(std::chrono::milliseconds interval, PerfSampleProvider provider, int quality = 0);

protected:
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

private:
PerfSampleProvider provider_;
int quality_;
};

class MsrCollector : public PeriodicCollector {
public:
MsrCollector(std::chrono::milliseconds interval, TemperatureSampleProvider provider, int quality = 0);

protected:
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

private:
TemperatureSampleProvider provider_;
int quality_;
};

class RaplCollector : public PeriodicCollector {
public:
RaplCollector(std::chrono::milliseconds interval, RaplSampleProvider provider, int quality = 0);

protected:
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

private:
RaplSampleProvider provider_;
int quality_;
};

class FreqCollector : public PeriodicCollector {
public:
FreqCollector(std::chrono::milliseconds interval, PerfSampleProvider provider, int quality = 0);

protected:
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

private:
PerfSampleProvider provider_;
int quality_;
};

class OemCollector : public PeriodicCollector {
public:
OemCollector(std::chrono::milliseconds interval, TemperatureSampleProvider provider, int quality = 0);

protected:
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;

private:
TemperatureSampleProvider provider_;
int quality_;
};

using TelemetryCollectorPtr = std::shared_ptr<TelemetryCollector>;
using TelemetryCollectorList = std::vector<TelemetryCollectorPtr>;

} // namespace telemetry

99 changes: 99 additions & 0 deletions include/telemetry/fusion.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <vector>

#include <telemetry/bus.h>
#include <telemetry/collector.h>

namespace telemetry {

struct TelemetrySnapshot {
uint64_t generation = 0;
std::chrono::steady_clock::time_point capture_time{};
bool degraded = false;

bool temp_available = false;
double package_temp_c = 0.0;

bool freq_available = false;
double freq_ratio = 0.0;

bool cpi_available = false;
double thermal_cpi = 0.0;

bool power_available = false;
double power_budget_w = 0.0;
};

struct TelemetryFusionConfig {
std::chrono::milliseconds poll_interval{50};
std::chrono::milliseconds freshness_window{100};
std::size_t ring_size = 64;
};

class TelemetrySnapshotRingBuffer {
public:
explicit TelemetrySnapshotRingBuffer(std::size_t capacity);

void publish(const TelemetrySnapshot &snapshot);
std::optional<TelemetrySnapshot> latest() const;
std::optional<TelemetrySnapshot> wait_for(uint64_t generation,
std::chrono::milliseconds timeout) const;

private:
std::size_t capacity_;
mutable std::mutex mutex_;
mutable std::condition_variable cv_;
std::vector<TelemetrySnapshot> buffer_;
uint64_t head_;
uint64_t last_generation_;
};

class TelemetryFusion {
public:
TelemetryFusion(TelemetryFusionConfig config,
std::shared_ptr<TelemetryBusManager> bus_manager);
~TelemetryFusion();

TelemetryFusion(const TelemetryFusion &) = delete;
TelemetryFusion &operator=(const TelemetryFusion &) = delete;

void start();
void stop();
bool running() const;

void register_collector(TelemetryCollectorPtr collector);

std::optional<TelemetrySnapshot> latest_snapshot() const;
std::optional<TelemetrySnapshot> wait_for_snapshot(uint64_t generation,
std::chrono::milliseconds timeout) const;

private:
void run();
TelemetrySnapshot fuse(std::chrono::steady_clock::time_point now);
bool assign_value(TelemetrySnapshot &snapshot,
TelemetrySignal signal,
double &out_value,
bool &out_flag,
std::chrono::steady_clock::time_point now);

TelemetryFusionConfig config_;
std::shared_ptr<TelemetryBusManager> bus_manager_;
std::shared_ptr<TelemetryBus> bus_;
TelemetrySnapshotRingBuffer ring_;
std::atomic<bool> running_;
std::atomic<uint64_t> generation_;
mutable std::mutex thread_mutex_;
std::thread thread_;
};

} // namespace telemetry

Loading