Skip to content

Commit c2ee10e

Browse files
committed
feat: 🌈 are prometheus remote reads actually *working* ?
1 parent a28d2e8 commit c2ee10e

File tree

6 files changed

+219
-70
lines changed

6 files changed

+219
-70
lines changed

‎.gitignore‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ test.db*
2626

2727
# Claude
2828
.claude/settings.local.json
29+
30+
reference/

‎src/ingestors/http/prometheus_read.rs‎

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ pub async fn prometheus_remote_read(
124124

125125
// Log detailed information about each query for debugging
126126
for (i, query) in read_request.queries.iter().enumerate() {
127+
println!(
128+
"[DEBUG PROM READ] Query {}: time range {}ms - {}ms ({} matchers)",
129+
i,
130+
query.start_timestamp_ms,
131+
query.end_timestamp_ms,
132+
query.matchers.len()
133+
);
127134
info!(
128135
"Query {}: time range {}ms - {}ms ({} matchers)",
129136
i,
@@ -133,6 +140,10 @@ pub async fn prometheus_remote_read(
133140
);
134141

135142
for matcher in &query.matchers {
143+
println!(
144+
"[DEBUG PROM READ] Matcher: {}={} (type={})",
145+
matcher.name, matcher.value, matcher.r#type
146+
);
136147
debug!(
137148
" Matcher: {}={} (type={})",
138149
matcher.name, matcher.value, matcher.r#type
@@ -226,7 +237,9 @@ async fn handle_streamed_response(
226237
state: &HttpServerState,
227238
read_request: &crate::parsing::prometheus::remote_read_models::ReadRequest,
228239
) -> Result<Response<axum::body::Body>, AppError> {
229-
let mut chunked_responses = Vec::with_capacity(read_request.queries.len());
240+
// Prometheus expects ONE ChunkedReadResponse per series, not one per query
241+
// Each message contains exactly one series
242+
let mut chunked_responses = Vec::new();
230243

231244
for (query_index, query) in read_request.queries.iter().enumerate() {
232245
// Convert Prometheus matchers to SensApp matchers
@@ -245,36 +258,88 @@ async fn handle_streamed_response(
245258
AppError::internal_server_error(anyhow::anyhow!("Storage query failed: {}", e))
246259
})?;
247260

261+
println!(
262+
"[DEBUG PROM READ] Query {} returned {} sensors from storage",
263+
query_index,
264+
sensor_data.len()
265+
);
248266
debug!(
249267
"Query {} returned {} sensors",
250268
query_index,
251269
sensor_data.len()
252270
);
253271

254-
// Convert to ChunkedSeries
255-
let chunked_series: Vec<_> = sensor_data
256-
.iter()
257-
.filter_map(|sd| {
258-
// Extract labels
259-
let labels = build_prometheus_labels(&sd.sensor);
272+
// Log each sensor found
273+
for sd in &sensor_data {
274+
println!(
275+
"[DEBUG PROM READ] Sensor: {} (type={:?}, {} samples)",
276+
sd.sensor.name,
277+
sd.sensor.sensor_type,
278+
sd.samples.len()
279+
);
280+
}
260281

261-
// Extract samples as Prometheus format
262-
let samples = extract_prom_samples_for_chunks(sd)?;
282+
// Convert each sensor to a separate ChunkedReadResponse
283+
// Prometheus expects ONE series per response message!
284+
for sd in &sensor_data {
285+
// Extract labels
286+
let labels = build_prometheus_labels(&sd.sensor);
287+
println!(
288+
"[DEBUG PROM READ] Labels for sensor {}: {:?}",
289+
sd.sensor.name,
290+
labels
291+
.iter()
292+
.map(|l| format!("{}={}", l.name, l.value))
293+
.collect::<Vec<_>>()
294+
);
263295

264-
// Encode as XOR chunks
265-
ChunkEncoder::encode_series(labels, samples).ok()
266-
})
267-
.collect();
296+
// Extract samples as Prometheus format
297+
let samples = match extract_prom_samples_for_chunks(sd) {
298+
Some(s) => s,
299+
None => continue, // Skip non-numeric types
300+
};
301+
302+
println!(
303+
"[DEBUG PROM READ] Encoding {} samples for sensor {} (first ts: {}, last ts: {})",
304+
samples.len(),
305+
sd.sensor.name,
306+
samples.first().map(|s| s.timestamp).unwrap_or(0),
307+
samples.last().map(|s| s.timestamp).unwrap_or(0)
308+
);
309+
310+
// Encode as XOR chunks
311+
let chunked_series = match ChunkEncoder::encode_series(labels, samples) {
312+
Ok(cs) => cs,
313+
Err(_) => continue, // Skip on encoding error
314+
};
315+
316+
// Create ONE response per series (this is what Prometheus expects!)
317+
let chunked_response =
318+
ChunkEncoder::create_response(query_index as i64, vec![chunked_series]);
319+
chunked_responses.push(chunked_response);
320+
}
268321

269-
let chunked_response = ChunkEncoder::create_response(query_index as i64, chunked_series);
270-
chunked_responses.push(chunked_response);
322+
println!(
323+
"[DEBUG PROM READ] Query {} produced {} chunked_responses (one per series)",
324+
query_index,
325+
chunked_responses.len()
326+
);
271327
}
272328

329+
println!(
330+
"[DEBUG PROM READ] Total {} chunked_responses to write",
331+
chunked_responses.len()
332+
);
333+
273334
// Create the streaming response body
274335
let body = StreamWriter::create_stream_body(&chunked_responses).map_err(|e| {
275336
AppError::internal_server_error(anyhow::anyhow!("Failed to create stream body: {}", e))
276337
})?;
277338

339+
println!(
340+
"[DEBUG PROM READ] Returning STREAMED_XOR_CHUNKS response with {} bytes",
341+
body.len()
342+
);
278343
info!(
279344
"Prometheus remote read: Returning STREAMED_XOR_CHUNKS response with {} bytes",
280345
body.len()

‎src/parsing/prometheus/chunk_encoder.rs‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use super::remote_read_models::{Chunk as ProtoChunk, ChunkedReadResponse, ChunkedSeries, chunk};
22
use super::remote_write_models::{Label, Sample};
33
use anyhow::Result;
4-
use rusty_chunkenc::chunk::Chunk;
5-
use rusty_chunkenc::xor::XORSample;
4+
use rusty_chunkenc::xor::{XORChunk, XORSample};
65
use tracing::debug;
76

87
/// Encodes time series samples into XOR-compressed chunks for Prometheus remote read.
@@ -45,11 +44,12 @@ impl ChunkEncoder {
4544
let max_time_ms = samples.last().map(|s| s.timestamp).unwrap_or(0);
4645

4746
// Create the XOR chunk
48-
let chunk = Chunk::new_xor(xor_samples);
47+
let xor_chunk = XORChunk::new(xor_samples);
4948

50-
// Encode the chunk to bytes
49+
// Encode just the raw XOR chunk data (NOT the full chunk format with length/type/crc)
50+
// Prometheus remote read expects raw XOR data starting with 2-byte BE sample count
5151
let mut encoded_data = Vec::new();
52-
chunk.write(&mut encoded_data)?;
52+
xor_chunk.write(&mut encoded_data)?;
5353

5454
debug!(
5555
"Encoded chunk: {} samples into {} bytes (time range: {}ms - {}ms)",

‎src/parsing/prometheus/remote_read_parser.rs‎

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::common::decompress_snappy;
2-
use super::remote_read_models::{QueryResult, ReadRequest, ReadResponse};
2+
use super::remote_read_models::{ReadRequest, ReadResponse};
33
use anyhow::Result;
44
use prost::Message;
55
use std::io::Cursor;
@@ -47,19 +47,6 @@ pub fn parse_remote_read_request(input: &[u8]) -> Result<ReadRequest> {
4747
Ok(request)
4848
}
4949

50-
pub fn create_empty_read_response(request: &ReadRequest) -> Result<ReadResponse> {
51-
// Create an empty response with the same number of query results as queries
52-
let results = request
53-
.queries
54-
.iter()
55-
.map(|_| QueryResult {
56-
timeseries: vec![], // Empty timeseries for now
57-
})
58-
.collect();
59-
60-
Ok(ReadResponse { results })
61-
}
62-
6350
pub fn serialize_read_response(response: &ReadResponse) -> Result<Vec<u8>> {
6451
let encoded = response.encode_to_vec();
6552
debug!("Encoded ReadResponse to {} bytes", encoded.len());
@@ -136,32 +123,6 @@ mod tests {
136123
assert_eq!(output.accepted_response_types.len(), 1);
137124
}
138125

139-
#[test]
140-
fn test_create_empty_read_response() {
141-
let request = ReadRequest {
142-
queries: vec![
143-
Query {
144-
start_timestamp_ms: 1000,
145-
end_timestamp_ms: 2000,
146-
matchers: vec![],
147-
hints: None,
148-
},
149-
Query {
150-
start_timestamp_ms: 3000,
151-
end_timestamp_ms: 4000,
152-
matchers: vec![],
153-
hints: None,
154-
},
155-
],
156-
accepted_response_types: vec![],
157-
};
158-
159-
let response = create_empty_read_response(&request).unwrap();
160-
assert_eq!(response.results.len(), 2);
161-
assert_eq!(response.results[0].timeseries.len(), 0);
162-
assert_eq!(response.results[1].timeseries.len(), 0);
163-
}
164-
165126
#[test]
166127
fn test_serialize_read_response() {
167128
let response = ReadResponse {

0 commit comments

Comments
 (0)