11use std:: time:: Duration ;
22
3+ use redis:: Commands ;
34use testcontainers_modules:: {
45 redis:: Redis ,
56 testcontainers:: { ContainerAsync , runners:: AsyncRunner } ,
@@ -18,9 +19,14 @@ async fn create_redis() -> (ContainerAsync<Redis>, String, u16) {
1819 ( r, h, p)
1920}
2021
21- fn config ( redis_url : String , redis_port : u16 , metrics_url : String ) -> IngestorConfig {
22+ fn config (
23+ redis_url : String ,
24+ redis_port : u16 ,
25+ metrics_url : String ,
26+ additional : & str ,
27+ ) -> IngestorConfig {
2228 toml:: from_str (
23- format ! (
29+ ( format ! (
2430 r#"enable_logging=false
2531
2632[redis]
@@ -46,8 +52,8 @@ watchdog_interval = {{ Secondly = 20 }}
4652publish_interval = {{ Millis = 5 }}
4753
4854"#
49- )
50- . as_str ( ) ,
55+ ) + additional )
56+ . as_str ( ) ,
5157 )
5258 . unwrap ( )
5359}
@@ -67,7 +73,7 @@ async fn test_system_metrics_propagated() {
6773 let ( redis_container, redis_url, redis_port) = create_redis ( ) . await ;
6874 let _ = redis_container. start ( ) . await ;
6975 let config: & ' static mut IngestorConfig =
70- Box :: leak ( Box :: new ( config ( redis_url, redis_port, server. url ( ) ) ) ) ;
76+ Box :: leak ( Box :: new ( config ( redis_url, redis_port, server. url ( ) , "" ) ) ) ;
7177
7278 let handle = spawn ( metrics_loop ( config) ) ;
7379 while !mock. matched_async ( ) . await {
@@ -81,3 +87,43 @@ async fn test_system_metrics_propagated() {
8187 . await
8288 . unwrap ( )
8389}
90+
91+ const ENCODED_METRIC : & [ u8 ; 539 ] = b"\x81 \xad __bec_codec__\x83 \xac encoder_name\xaa BECMessage\xa9 type_name\xb4 DynamicMetricMessage\xa4 data\x83 \xa8 metadata\x80 \xa7 metrics\x84 \xa1 a\x81 \xad __bec_codec__\x83 \xac encoder_name\xa9 BaseModel\xa9 type_name\xb6 _StrDynamicMetricValue\xa4 data\x82 \xa5 value\xa6 test_a\xa9 type_name\xa3 str\xa1 b\x81 \xad __bec_codec__\x83 \xac encoder_name\xa9 BaseModel\xa9 type_name\xb6 _IntDynamicMetricValue\xa4 data\x82 \xa5 value\x05 \xa9 type_name\xa3 int\xa1 c\x81 \xad __bec_codec__\x83 \xac encoder_name\xa9 BaseModel\xa9 type_name\xb8 _FloatDynamicMetricValue\xa4 data\x82 \xa5 value\xcb @#\xcc \xcc \xcc \xcc \xcc \xcd \xa9 type_name\xa5 float\xa1 d\x81 \xad __bec_codec__\x83 \xac encoder_name\xa9 BaseModel\xa9 type_name\xb7 _BoolDynamicMetricValue\xa4 data\x82 \xa5 value\xc2 \xa9 type_name\xa4 bool\xa9 timestamp\xcb A\xda l\xa4 \x90 \x93 \x8f \xca " ;
92+
93+ #[ tokio:: test( flavor = "multi_thread" ) ]
94+ async fn test_dynamic_metrics ( ) {
95+ let mut server = mockito:: Server :: new_async ( ) . await ;
96+ let mock = server. mock ( "POST" , "/" ) . expect ( 4 ) . create ( ) ;
97+
98+ let test = tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , async {
99+ let ( redis_container, redis_url, redis_port) = create_redis ( ) . await ;
100+ let _ = redis_container. start ( ) . await ;
101+ let config: & ' static mut IngestorConfig = Box :: leak ( Box :: new ( config (
102+ redis_url,
103+ redis_port,
104+ server. url ( ) ,
105+ "
106+ [metrics.dynamic]
107+ dynamic_metric = { read_type = \" PubSub\" , key = \" info/dynamic_metrics/test\" }
108+ " ,
109+ ) ) ) ;
110+
111+ let handle = spawn ( metrics_loop ( config) ) ;
112+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
113+ let mut conn = redis:: Client :: open ( config. redis . url . full_url ( ) )
114+ . unwrap ( )
115+ . get_connection ( )
116+ . unwrap ( ) ;
117+ let _: Result < String , _ > = dbg ! ( conn. publish( "info/dynamic_metrics/test" , ENCODED_METRIC ) ) ;
118+ while !mock. matched_async ( ) . await {
119+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
120+ }
121+ handle. abort ( ) ;
122+ let _ = redis_container. stop_with_timeout ( Some ( 0 ) ) . await ;
123+ let _ = handle. await ;
124+ } )
125+ . await ;
126+
127+ mock. assert ( ) ; // check no more came after finishing
128+ test. unwrap ( ) ;
129+ }
0 commit comments