Skip to content

Commit a28d2e8

Browse files
committed
feat: ⚡️ work on sensors fetching perfs
1 parent e9a0f95 commit a28d2e8

File tree

18 files changed

+1480
-42
lines changed

18 files changed

+1480
-42
lines changed

src/datamodel/sensapp_datetime.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,22 @@ mod tests {
5151
assert_send::<SensAppDateTime>();
5252
}
5353

54+
#[test]
55+
fn test_milliseconds_roundtrip() {
56+
// Test that from_unix_milliseconds_i64 -> to_unix_milliseconds roundtrips correctly
57+
let test_cases: &[i64] = &[
58+
1000, // Small value
59+
1704067200000, // Jan 1, 2024 00:00:00 UTC
60+
1704067200123, // With subsecond precision
61+
];
62+
63+
for &input_ms in test_cases {
64+
let epoch = SensAppDateTime::from_unix_milliseconds_i64(input_ms);
65+
let output_ms = epoch.to_unix_milliseconds().floor() as i64;
66+
assert_eq!(input_ms, output_ms, "from_unix_milliseconds_i64 should roundtrip for {}", input_ms);
67+
}
68+
}
69+
5470
#[test]
5571
fn test_sensapp_datetime_to_offset_datetime() {
5672
let hifitime_now = hifitime::Epoch::now().unwrap();

src/ingestors/http/prometheus_read.rs

Lines changed: 188 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
use crate::datamodel::SensAppDateTime;
2+
use crate::datamodel::sensapp_datetime::SensAppDateTimeExt;
3+
use crate::parsing::prometheus::chunk_encoder::ChunkEncoder;
4+
use crate::parsing::prometheus::converter::{build_prometheus_labels, sensor_data_to_timeseries};
5+
use crate::parsing::prometheus::remote_read_models::{
6+
QueryResult, ReadResponse, read_request::ResponseType,
7+
};
18
use crate::parsing::prometheus::remote_read_parser::{
2-
create_empty_read_response, parse_remote_read_request, serialize_read_response,
9+
parse_remote_read_request, serialize_read_response,
310
};
11+
use crate::parsing::prometheus::remote_write_models::Sample as PromSample;
12+
use crate::parsing::prometheus::stream_writer::StreamWriter;
13+
use crate::storage::query::LabelMatcher;
414

515
use super::{app_error::AppError, state::HttpServerState};
616
use axum::{
@@ -10,7 +20,7 @@ use axum::{
1020
response::Response,
1121
};
1222
use tokio_util::bytes::Bytes;
13-
use tracing::{debug, info, warn};
23+
use tracing::{debug, info};
1424

1525
fn verify_read_headers(headers: &HeaderMap) -> Result<(), AppError> {
1626
// Check that we have the right content encoding, that must be snappy
@@ -93,7 +103,7 @@ fn verify_read_headers(headers: &HeaderMap) -> Result<(), AppError> {
93103
)]
94104
#[debug_handler]
95105
pub async fn prometheus_remote_read(
96-
State(_state): State<HttpServerState>,
106+
State(state): State<HttpServerState>,
97107
headers: HeaderMap,
98108
bytes: Bytes,
99109
) -> Result<Response<axum::body::Body>, AppError> {
@@ -139,33 +149,198 @@ pub async fn prometheus_remote_read(
139149
read_request.accepted_response_types
140150
);
141151

142-
// For now, create an empty response
143-
warn!("Returning empty response - data fetching not yet implemented");
144-
let empty_response = create_empty_read_response(&read_request).map_err(|e| {
145-
AppError::internal_server_error(anyhow::anyhow!("Failed to create response: {}", e))
146-
})?;
152+
// Check if client accepts streamed XOR chunks
153+
let use_streaming = read_request
154+
.accepted_response_types
155+
.contains(&(ResponseType::StreamedXorChunks as i32));
156+
157+
if use_streaming {
158+
// Return streamed chunked response
159+
handle_streamed_response(&state, &read_request).await
160+
} else {
161+
// Return standard SAMPLES response
162+
handle_samples_response(&state, &read_request).await
163+
}
164+
}
165+
166+
/// Handle standard SAMPLES response type
167+
async fn handle_samples_response(
168+
state: &HttpServerState,
169+
read_request: &crate::parsing::prometheus::remote_read_models::ReadRequest,
170+
) -> Result<Response<axum::body::Body>, AppError> {
171+
let mut results = Vec::with_capacity(read_request.queries.len());
172+
173+
for query in &read_request.queries {
174+
// Convert Prometheus matchers to SensApp matchers
175+
let matchers: Vec<LabelMatcher> = query.matchers.iter().map(LabelMatcher::from).collect();
176+
177+
// Convert timestamps from milliseconds to SensAppDateTime
178+
let start_time = SensAppDateTime::from_unix_milliseconds_i64(query.start_timestamp_ms);
179+
let end_time = SensAppDateTime::from_unix_milliseconds_i64(query.end_timestamp_ms);
180+
181+
// Query storage (numeric_only=true for Prometheus compatibility)
182+
let sensor_data = state
183+
.storage
184+
.query_sensors_by_labels(&matchers, Some(start_time), Some(end_time), None, true)
185+
.await
186+
.map_err(|e| {
187+
AppError::internal_server_error(anyhow::anyhow!("Storage query failed: {}", e))
188+
})?;
189+
190+
debug!("Query returned {} sensors", sensor_data.len());
191+
192+
// Convert to Prometheus TimeSeries
193+
let timeseries = sensor_data
194+
.iter()
195+
.filter_map(sensor_data_to_timeseries)
196+
.collect();
197+
198+
results.push(QueryResult { timeseries });
199+
}
200+
201+
let response = ReadResponse { results };
147202

148203
// Serialize and compress the response
149-
let response_bytes = serialize_read_response(&empty_response).map_err(|e| {
204+
let response_bytes = serialize_read_response(&response).map_err(|e| {
150205
AppError::internal_server_error(anyhow::anyhow!("Failed to serialize response: {}", e))
151206
})?;
152207

153208
info!(
154-
"Prometheus remote read: Returning response with {} bytes",
209+
"Prometheus remote read: Returning SAMPLES response with {} bytes",
155210
response_bytes.len()
156211
);
157212

158213
// Build HTTP response with appropriate headers
159-
let response = Response::builder()
214+
Response::builder()
160215
.status(StatusCode::OK)
161216
.header("content-type", "application/x-protobuf")
162217
.header("content-encoding", "snappy")
163218
.body(axum::body::Body::from(response_bytes))
164219
.map_err(|e| {
165220
AppError::internal_server_error(anyhow::anyhow!("Failed to build response: {}", e))
166-
})?;
221+
})
222+
}
223+
224+
/// Handle STREAMED_XOR_CHUNKS response type
225+
async fn handle_streamed_response(
226+
state: &HttpServerState,
227+
read_request: &crate::parsing::prometheus::remote_read_models::ReadRequest,
228+
) -> Result<Response<axum::body::Body>, AppError> {
229+
let mut chunked_responses = Vec::with_capacity(read_request.queries.len());
230+
231+
for (query_index, query) in read_request.queries.iter().enumerate() {
232+
// Convert Prometheus matchers to SensApp matchers
233+
let matchers: Vec<LabelMatcher> = query.matchers.iter().map(LabelMatcher::from).collect();
234+
235+
// Convert timestamps from milliseconds to SensAppDateTime
236+
let start_time = SensAppDateTime::from_unix_milliseconds_i64(query.start_timestamp_ms);
237+
let end_time = SensAppDateTime::from_unix_milliseconds_i64(query.end_timestamp_ms);
238+
239+
// Query storage (numeric_only=true for Prometheus compatibility)
240+
let sensor_data = state
241+
.storage
242+
.query_sensors_by_labels(&matchers, Some(start_time), Some(end_time), None, true)
243+
.await
244+
.map_err(|e| {
245+
AppError::internal_server_error(anyhow::anyhow!("Storage query failed: {}", e))
246+
})?;
247+
248+
debug!(
249+
"Query {} returned {} sensors",
250+
query_index,
251+
sensor_data.len()
252+
);
253+
254+
// Convert to ChunkedSeries
255+
let chunked_series: Vec<_> = sensor_data
256+
.iter()
257+
.filter_map(|sd| {
258+
// Extract labels
259+
let labels = build_prometheus_labels(&sd.sensor);
260+
261+
// Extract samples as Prometheus format
262+
let samples = extract_prom_samples_for_chunks(sd)?;
263+
264+
// Encode as XOR chunks
265+
ChunkEncoder::encode_series(labels, samples).ok()
266+
})
267+
.collect();
268+
269+
let chunked_response = ChunkEncoder::create_response(query_index as i64, chunked_series);
270+
chunked_responses.push(chunked_response);
271+
}
272+
273+
// Create the streaming response body
274+
let body = StreamWriter::create_stream_body(&chunked_responses).map_err(|e| {
275+
AppError::internal_server_error(anyhow::anyhow!("Failed to create stream body: {}", e))
276+
})?;
277+
278+
info!(
279+
"Prometheus remote read: Returning STREAMED_XOR_CHUNKS response with {} bytes",
280+
body.len()
281+
);
282+
283+
// Build HTTP response with appropriate headers for streaming
284+
Response::builder()
285+
.status(StatusCode::OK)
286+
.header(
287+
"content-type",
288+
"application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse",
289+
)
290+
.body(axum::body::Body::from(body))
291+
.map_err(|e| {
292+
AppError::internal_server_error(anyhow::anyhow!("Failed to build response: {}", e))
293+
})
294+
}
167295

168-
Ok(response)
296+
/// Extract Prometheus samples from SensorData for chunk encoding.
297+
/// Returns None for non-numeric types.
298+
fn extract_prom_samples_for_chunks(
299+
sensor_data: &crate::datamodel::SensorData,
300+
) -> Option<Vec<PromSample>> {
301+
use crate::datamodel::TypedSamples;
302+
303+
match &sensor_data.samples {
304+
TypedSamples::Float(samples) => {
305+
let prom_samples = samples
306+
.iter()
307+
.map(|s| PromSample {
308+
value: s.value,
309+
timestamp: s.datetime.to_unix_milliseconds().floor() as i64,
310+
})
311+
.collect();
312+
Some(prom_samples)
313+
}
314+
TypedSamples::Integer(samples) => {
315+
let prom_samples = samples
316+
.iter()
317+
.map(|s| PromSample {
318+
value: s.value as f64,
319+
timestamp: s.datetime.to_unix_milliseconds().floor() as i64,
320+
})
321+
.collect();
322+
Some(prom_samples)
323+
}
324+
TypedSamples::Numeric(samples) => {
325+
use rust_decimal::prelude::ToPrimitive;
326+
let prom_samples = samples
327+
.iter()
328+
.filter_map(|s| {
329+
s.value.to_f64().map(|value| PromSample {
330+
value,
331+
timestamp: s.datetime.to_unix_milliseconds().floor() as i64,
332+
})
333+
})
334+
.collect();
335+
Some(prom_samples)
336+
}
337+
// Non-numeric types cannot be represented in Prometheus format
338+
TypedSamples::String(_)
339+
| TypedSamples::Boolean(_)
340+
| TypedSamples::Location(_)
341+
| TypedSamples::Blob(_)
342+
| TypedSamples::Json(_) => None,
343+
}
169344
}
170345

171346
#[cfg(test)]

src/ingestors/http/server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use axum::Router;
2222
use axum::extract::DefaultBodyLimit;
2323
use axum::extract::State;
2424
use axum::http::HeaderMap;
25+
use axum::http::StatusCode;
2526
use axum::http::header;
2627
use axum::routing::get;
2728
use axum::routing::post;
@@ -70,7 +71,10 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res
7071
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
7172
)
7273
.sensitive_response_headers(sensitive_headers)
73-
.layer(TimeoutLayer::new(Duration::from_secs(timeout_seconds)))
74+
.layer(TimeoutLayer::with_status_code(
75+
StatusCode::REQUEST_TIMEOUT,
76+
Duration::from_secs(timeout_seconds),
77+
))
7478
.compression()
7579
.into_inner();
7680

src/ingestors/http/simple_promql.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,16 @@ pub async fn simple_promql_query(
262262
// Parse and validate the query
263263
let parsed = parse_promql_query(&query.query)?;
264264

265-
// Execute the query
265+
// Execute the query (numeric_only=false since we support all types in export formats)
266266
let results = state
267267
.storage
268-
.query_sensors_by_labels(&parsed.matchers, parsed.start_time, parsed.end_time, None)
268+
.query_sensors_by_labels(
269+
&parsed.matchers,
270+
parsed.start_time,
271+
parsed.end_time,
272+
None,
273+
false,
274+
)
269275
.await?;
270276

271277
// Parse format from query parameter, default to SenML/JSON

0 commit comments

Comments
 (0)