Skip to content

Commit 5255eec

Browse files
Add telemetry fusion collectors and controller integration
1 parent 9807692 commit 5255eec

File tree

13 files changed

+924
-4
lines changed

13 files changed

+924
-4
lines changed

CMakeLists.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ set(THERMAL_SIMD_CORE_SOURCES
2727
src/telemetry/evaluator.cpp
2828
src/telemetry/history_store.cpp
2929
src/telemetry/sensors.cpp
30+
src/telemetry/bus.cpp
31+
src/telemetry/collector.cpp
32+
src/telemetry/fusion.cpp
33+
src/telemetry/fusion_bridge.cpp
3034
src/observability/metrics.cpp
3135
src/thermal_config.c
3236
src/thermal_cpu.c
@@ -132,6 +136,16 @@ if(BUILD_TESTING)
132136
target_compile_options(test_telemetry PRIVATE -Wall -Wextra)
133137
add_test(NAME telemetry_adapters COMMAND test_telemetry)
134138

139+
add_executable(test_telemetry_fusion tests/telemetry/test_fusion_thread.cpp)
140+
target_link_libraries(test_telemetry_fusion PRIVATE thermal_simd_core_tests pthread)
141+
target_compile_options(test_telemetry_fusion PRIVATE -Wall -Wextra -pthread)
142+
add_test(NAME telemetry_fusion_thread COMMAND test_telemetry_fusion)
143+
144+
add_executable(test_telemetry_fusion_stress tests/telemetry/test_fusion_stress.cpp)
145+
target_link_libraries(test_telemetry_fusion_stress PRIVATE thermal_simd_core_tests pthread)
146+
target_compile_options(test_telemetry_fusion_stress PRIVATE -Wall -Wextra -pthread)
147+
add_test(NAME telemetry_fusion_stress COMMAND test_telemetry_fusion_stress)
148+
135149
add_executable(test_trampoline_security tests/patcher/test_trampoline_security.cpp)
136150
target_link_libraries(test_trampoline_security PRIVATE thermal_simd_core_tests pthread)
137151
target_compile_definitions(test_trampoline_security PRIVATE TSD_ENABLE_TESTS)

docs/telemetry.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Telemetry Subsystem Overview
2+
3+
The telemetry subsystem normalizes hardware sensor readings, fuses them into
4+
consistent snapshots, and publishes those frames to the predictive controller.
5+
This document describes the new collector abstractions, the telemetry bus
6+
manager, and the fusion thread that feeds controller consumers.
7+
8+
## Module Layout
9+
10+
The subsystem lives under `src/telemetry/` with public headers in
11+
`include/telemetry/`:
12+
13+
| Path | Description |
14+
| ---- | ----------- |
15+
| `include/telemetry/bus.h` | Shared `TelemetryBus` state and manager facade. |
16+
| `include/telemetry/collector.h` | Collector interfaces plus concrete collectors for perf, MSR, RAPL, frequency, and OEM sources. |
17+
| `include/telemetry/fusion.h` | Ring buffer, fusion thread, and snapshot types. |
18+
| `src/telemetry/bus.cpp` | `TelemetryBus` and manager implementation. |
19+
| `src/telemetry/collector.cpp` | Periodic collector helpers and concrete collectors. |
20+
| `src/telemetry/fusion.cpp` | Fusion thread, snapshot ring buffer, and freshness logic. |
21+
| `src/telemetry/fusion_bridge.cpp` | C bridge that exposes fusion outputs to the legacy runtime. |
22+
23+
The bridge exports the `tsd_telemetry_fusion_*` C helpers declared in
24+
`include/thermal/simd/telemetry_fusion.h` so C modules can start/stop the fusion
25+
thread and fetch fused telemetry samples.
26+
27+
## Collectors & Bus Manager
28+
29+
Each collector implements the `TelemetryCollector` interface and publishes
30+
`TelemetryReading` values onto the shared `TelemetryBus`:
31+
32+
* `PerfCollector` – provides CPI and frequency ratio hints derived from perf
33+
counters.
34+
* `MsrCollector` – ingests package/core temperatures from IA32 thermal MSRs.
35+
* `RaplCollector` – translates RAPL energy deltas into instantaneous power
36+
budgets.
37+
* `FreqCollector` – polls `/proc/cpuinfo_cur_freq` style interfaces for turbo
38+
residency hints.
39+
* `OemCollector` – optionally sources OEM/PMBus temperatures with higher
40+
quality scores.
41+
42+
`TelemetryBusManager` owns the collector list and coordinates polling on the
43+
fusion thread. Collectors are `PeriodicCollector` derivatives that enforce their
44+
own polling cadence while protecting the bus from concurrent updates.
45+
46+
## Fusion Thread & Snapshots
47+
48+
`TelemetryFusion` owns a background thread that polls registered collectors,
49+
aggregates their latest readings, and emits immutable `TelemetrySnapshot`
50+
instances into a lock-protected ring buffer. Freshness is enforced with a
51+
configurable window (`TelemetryFusionConfig::freshness_window`); any signal that
52+
misses the window is marked unavailable and the snapshot is flagged as
53+
`degraded` so the controller can fall back.
54+
55+
Consumers can either call `latest_snapshot()` for a non-blocking read or
56+
`wait_for_snapshot()` to block on a minimum generation. The ring buffer size is
57+
configurable to cover fast polling cadences without blocking producers.
58+
59+
## Controller Integration
60+
61+
`tsd_perf_init()` now starts the fusion thread via
62+
`tsd_telemetry_fusion_start()`, and `tsd_perf_cleanup()` stops it. During perf
63+
evaluations the runtime first tries `tsd_telemetry_fusion_sample()` to obtain a
64+
fused frame before falling back to the legacy synchronous helper. This keeps the
65+
predictive controller aligned with fused telemetry while preserving existing
66+
fallback behaviour when collectors are unavailable.
67+
68+
## Testing
69+
70+
Two new test binaries exercise the subsystem:
71+
72+
* `test_telemetry_fusion` – integration test that verifies collectors publish to
73+
the bus, snapshots respect freshness, and degradation flags propagate.
74+
* `test_telemetry_fusion_stress` – stress harness that ensures the ring buffer
75+
and fusion thread remain responsive over rapid polling cycles.
76+
77+
Both tests live under `tests/telemetry/` and are wired into CTest.
78+

include/telemetry/bus.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <memory>
5+
#include <mutex>
6+
#include <optional>
7+
#include <unordered_map>
8+
#include <vector>
9+
10+
namespace telemetry {
11+
12+
enum class TelemetrySignal {
13+
kPackageTempC,
14+
kFrequencyRatio,
15+
kThermalCpi,
16+
kPowerBudgetWatts,
17+
};
18+
19+
struct TelemetryReading {
20+
double value = 0.0;
21+
bool valid = false;
22+
int quality = 0;
23+
std::chrono::steady_clock::time_point timestamp{};
24+
};
25+
26+
class TelemetryBus {
27+
public:
28+
TelemetryBus();
29+
30+
void publish(TelemetrySignal signal, const TelemetryReading &reading);
31+
std::optional<TelemetryReading> latest(TelemetrySignal signal) const;
32+
33+
private:
34+
using ReadingMap = std::unordered_map<TelemetrySignal, TelemetryReading>;
35+
36+
mutable std::mutex mutex_;
37+
ReadingMap readings_;
38+
};
39+
40+
class TelemetryCollector;
41+
42+
class TelemetryBusManager {
43+
public:
44+
TelemetryBusManager();
45+
46+
void set_bus(std::shared_ptr<TelemetryBus> bus);
47+
std::shared_ptr<TelemetryBus> bus() const;
48+
49+
void add_collector(std::shared_ptr<TelemetryCollector> collector);
50+
std::vector<std::shared_ptr<TelemetryCollector>> collectors() const;
51+
void poll(std::chrono::steady_clock::time_point now);
52+
53+
private:
54+
std::shared_ptr<TelemetryBus> bus_;
55+
std::vector<std::shared_ptr<TelemetryCollector>> collectors_;
56+
};
57+
58+
} // namespace telemetry
59+

include/telemetry/collector.h

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <functional>
5+
#include <memory>
6+
#include <string>
7+
#include <vector>
8+
9+
#include <telemetry/bus.h>
10+
11+
namespace telemetry {
12+
13+
struct PerfSample {
14+
double thermal_cpi = 0.0;
15+
double freq_hint = 0.0;
16+
bool valid = false;
17+
};
18+
19+
struct TemperatureSample {
20+
double package_temp_c = 0.0;
21+
bool valid = false;
22+
};
23+
24+
struct RaplSample {
25+
double power_budget_w = 0.0;
26+
bool valid = false;
27+
};
28+
29+
using PerfSampleProvider = std::function<PerfSample()>;
30+
using TemperatureSampleProvider = std::function<TemperatureSample()>;
31+
using RaplSampleProvider = std::function<RaplSample()>;
32+
33+
class TelemetryCollector {
34+
public:
35+
virtual ~TelemetryCollector() = default;
36+
37+
virtual std::string name() const = 0;
38+
virtual void collect(TelemetryBus &bus, std::chrono::steady_clock::time_point now) = 0;
39+
};
40+
41+
class PeriodicCollector : public TelemetryCollector {
42+
public:
43+
PeriodicCollector(std::string name, std::chrono::milliseconds interval);
44+
45+
std::string name() const override;
46+
void collect(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
47+
48+
protected:
49+
virtual void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) = 0;
50+
51+
private:
52+
std::string name_;
53+
std::chrono::milliseconds interval_;
54+
std::chrono::steady_clock::time_point next_run_;
55+
};
56+
57+
class PerfCollector : public PeriodicCollector {
58+
public:
59+
PerfCollector(std::chrono::milliseconds interval, PerfSampleProvider provider, int quality = 0);
60+
61+
protected:
62+
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
63+
64+
private:
65+
PerfSampleProvider provider_;
66+
int quality_;
67+
};
68+
69+
class MsrCollector : public PeriodicCollector {
70+
public:
71+
MsrCollector(std::chrono::milliseconds interval, TemperatureSampleProvider provider, int quality = 0);
72+
73+
protected:
74+
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
75+
76+
private:
77+
TemperatureSampleProvider provider_;
78+
int quality_;
79+
};
80+
81+
class RaplCollector : public PeriodicCollector {
82+
public:
83+
RaplCollector(std::chrono::milliseconds interval, RaplSampleProvider provider, int quality = 0);
84+
85+
protected:
86+
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
87+
88+
private:
89+
RaplSampleProvider provider_;
90+
int quality_;
91+
};
92+
93+
class FreqCollector : public PeriodicCollector {
94+
public:
95+
FreqCollector(std::chrono::milliseconds interval, PerfSampleProvider provider, int quality = 0);
96+
97+
protected:
98+
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
99+
100+
private:
101+
PerfSampleProvider provider_;
102+
int quality_;
103+
};
104+
105+
class OemCollector : public PeriodicCollector {
106+
public:
107+
OemCollector(std::chrono::milliseconds interval, TemperatureSampleProvider provider, int quality = 0);
108+
109+
protected:
110+
void sample(TelemetryBus &bus, std::chrono::steady_clock::time_point now) override;
111+
112+
private:
113+
TemperatureSampleProvider provider_;
114+
int quality_;
115+
};
116+
117+
using TelemetryCollectorPtr = std::shared_ptr<TelemetryCollector>;
118+
using TelemetryCollectorList = std::vector<TelemetryCollectorPtr>;
119+
120+
} // namespace telemetry
121+

include/telemetry/fusion.h

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <chrono>
5+
#include <condition_variable>
6+
#include <cstdint>
7+
#include <memory>
8+
#include <mutex>
9+
#include <optional>
10+
#include <thread>
11+
#include <vector>
12+
13+
#include <telemetry/bus.h>
14+
#include <telemetry/collector.h>
15+
16+
namespace telemetry {
17+
18+
struct TelemetrySnapshot {
19+
uint64_t generation = 0;
20+
std::chrono::steady_clock::time_point capture_time{};
21+
bool degraded = false;
22+
23+
bool temp_available = false;
24+
double package_temp_c = 0.0;
25+
26+
bool freq_available = false;
27+
double freq_ratio = 0.0;
28+
29+
bool cpi_available = false;
30+
double thermal_cpi = 0.0;
31+
32+
bool power_available = false;
33+
double power_budget_w = 0.0;
34+
};
35+
36+
struct TelemetryFusionConfig {
37+
std::chrono::milliseconds poll_interval{50};
38+
std::chrono::milliseconds freshness_window{100};
39+
std::size_t ring_size = 64;
40+
};
41+
42+
class TelemetrySnapshotRingBuffer {
43+
public:
44+
explicit TelemetrySnapshotRingBuffer(std::size_t capacity);
45+
46+
void publish(const TelemetrySnapshot &snapshot);
47+
std::optional<TelemetrySnapshot> latest() const;
48+
std::optional<TelemetrySnapshot> wait_for(uint64_t generation,
49+
std::chrono::milliseconds timeout) const;
50+
51+
private:
52+
std::size_t capacity_;
53+
mutable std::mutex mutex_;
54+
mutable std::condition_variable cv_;
55+
std::vector<TelemetrySnapshot> buffer_;
56+
uint64_t head_;
57+
uint64_t last_generation_;
58+
};
59+
60+
class TelemetryFusion {
61+
public:
62+
TelemetryFusion(TelemetryFusionConfig config,
63+
std::shared_ptr<TelemetryBusManager> bus_manager);
64+
~TelemetryFusion();
65+
66+
TelemetryFusion(const TelemetryFusion &) = delete;
67+
TelemetryFusion &operator=(const TelemetryFusion &) = delete;
68+
69+
void start();
70+
void stop();
71+
bool running() const;
72+
73+
void register_collector(TelemetryCollectorPtr collector);
74+
75+
std::optional<TelemetrySnapshot> latest_snapshot() const;
76+
std::optional<TelemetrySnapshot> wait_for_snapshot(uint64_t generation,
77+
std::chrono::milliseconds timeout) const;
78+
79+
private:
80+
void run();
81+
TelemetrySnapshot fuse(std::chrono::steady_clock::time_point now);
82+
bool assign_value(TelemetrySnapshot &snapshot,
83+
TelemetrySignal signal,
84+
double &out_value,
85+
bool &out_flag,
86+
std::chrono::steady_clock::time_point now);
87+
88+
TelemetryFusionConfig config_;
89+
std::shared_ptr<TelemetryBusManager> bus_manager_;
90+
std::shared_ptr<TelemetryBus> bus_;
91+
TelemetrySnapshotRingBuffer ring_;
92+
std::atomic<bool> running_;
93+
std::atomic<uint64_t> generation_;
94+
mutable std::mutex thread_mutex_;
95+
std::thread thread_;
96+
};
97+
98+
} // namespace telemetry
99+

0 commit comments

Comments
 (0)