1+ use chrono:: Utc ;
2+ use futures:: { SinkExt , StreamExt } ;
3+ use std:: collections:: BTreeMap ;
14use std:: sync:: Arc ;
25
36use pony:: metrics:: storage:: MetricStorage ;
47
58pub async fn handle_ws_client (
69 socket : warp:: ws:: WebSocket ,
710 node_id : uuid:: Uuid ,
8- metric : String ,
11+ series_hash : u64 ,
912 storage : Arc < MetricStorage > ,
1013) {
11- use futures:: { SinkExt , StreamExt } ;
1214 let ( mut ws_tx, _) = socket. split ( ) ;
1315 let mut ticker = tokio:: time:: interval ( std:: time:: Duration :: from_millis ( 1000 ) ) ;
1416
@@ -18,19 +20,71 @@ pub async fn handle_ws_client(
1820 let now_ms = chrono:: Utc :: now ( ) . timestamp_millis ( ) ;
1921 let ten_min_ago_ms = now_ms - ( 10 * 60 * 1000 ) ;
2022
21- let points = storage. get_range ( & node_id, & metric , ten_min_ago_ms, now_ms) ;
23+ let points = storage. get_range ( & node_id, series_hash , ten_min_ago_ms, now_ms) ;
2224
2325 if !points. is_empty ( ) {
24- let chart_points: Vec < serde_json:: Value > = points
25- . into_iter ( )
26- . map ( |p| serde_json:: json!( { "x" : p. timestamp, "y" : p. value } ) )
27- . collect ( ) ;
28-
29- if let Ok ( msg) = serde_json:: to_string ( & chart_points) {
30- if let Err ( e) = ws_tx. send ( warp:: ws:: Message :: text ( msg) ) . await {
31- log:: error!( "WS send error: {}" , e) ;
32- break ;
33- }
26+ let msg = serde_json:: json!( {
27+ "type" : "update" ,
28+ "node_id" : node_id,
29+ "hash" : series_hash,
30+ "data" : points. iter( ) . map( |p| ( p. timestamp, p. value) ) . collect:: <Vec <_>>( )
31+ } ) ;
32+
33+ if ws_tx
34+ . send ( warp:: ws:: Message :: text ( msg. to_string ( ) ) )
35+ . await
36+ . is_err ( )
37+ {
38+ break ;
39+ }
40+ }
41+ }
42+ }
43+
44+ pub async fn handle_aggregated_ws (
45+ socket : warp:: ws:: WebSocket ,
46+ tag_key : String ,
47+ tag_value : String ,
48+ metric_name : String ,
49+ storage : Arc < MetricStorage > ,
50+ ) {
51+ let ( mut ws_tx, _) = socket. split ( ) ;
52+ let mut ticker = tokio:: time:: interval ( std:: time:: Duration :: from_millis ( 1000 ) ) ;
53+
54+ loop {
55+ ticker. tick ( ) . await ;
56+
57+ let now = Utc :: now ( ) . timestamp_millis ( ) ;
58+ let aggregated_data =
59+ storage. get_aggregated_range ( & tag_key, & tag_value, & metric_name, now - 600_000 , now) ;
60+
61+ if !aggregated_data. is_empty ( ) {
62+ let response = aggregated_data
63+ . iter ( )
64+ . map ( |( id, points) | {
65+ (
66+ id,
67+ points
68+ . iter ( )
69+ . map ( |p| ( p. timestamp , p. value ) )
70+ . collect :: < Vec < _ > > ( ) ,
71+ )
72+ } )
73+ . collect :: < BTreeMap < _ , _ > > ( ) ;
74+
75+ let msg = serde_json:: json!( {
76+ "type" : "aggregated_update" ,
77+ "tag" : format!( "{}:{}" , tag_key, tag_value) ,
78+ "metric" : metric_name,
79+ "data" : response
80+ } ) ;
81+
82+ if ws_tx
83+ . send ( warp:: ws:: Message :: text ( msg. to_string ( ) ) )
84+ . await
85+ . is_err ( )
86+ {
87+ break ;
3488 }
3589 }
3690 }
0 commit comments