@@ -154,13 +154,13 @@ impl AstarteSdk {
154154 let topic = parse_topic ( & p. topic ) ;
155155
156156 if let Some ( ( _, _, interface, path) ) = topic {
157+ let bdata = p. payload . to_vec ( ) ;
158+
157159 if interface == "control" && path == "/consumer/properties" {
158- // TODO: implement consumer purge properties
160+ self . purge_properties ( bdata ) . await ? ;
159161 continue ;
160162 }
161163
162- let bdata = p. payload . to_vec ( ) ;
163-
164164 debug ! ( "Incoming publish = {} {:?}" , p. topic, bdata) ;
165165
166166 if let Some ( database) = & self . database {
@@ -226,6 +226,25 @@ impl AstarteSdk {
226226 format ! ( "{}/{}" , self . realm, self . device_id)
227227 }
228228
229+ async fn purge_properties ( & self , bdata : Vec < u8 > ) -> Result < ( ) , AstarteError > {
230+ if let Some ( db) = & self . database {
231+ let stored_props = db. load_all_props ( ) . await ?;
232+
233+ let paths = utils:: extract_set_properties ( & bdata) ;
234+
235+ for stored_prop in stored_props {
236+ if paths. contains ( & ( stored_prop. interface . clone ( ) + & stored_prop. path ) ) {
237+ continue ;
238+ }
239+
240+ db. delete_prop ( & stored_prop. interface , & stored_prop. path )
241+ . await ?;
242+ }
243+ }
244+
245+ Ok ( ( ) )
246+ }
247+
229248 async fn send_emptycache ( & self ) -> Result < ( ) , AstarteError > {
230249 let url = self . client_id ( ) + "/control/emptyCache" ;
231250 debug ! ( "sending emptyCache to {}" , url) ;
@@ -655,6 +674,19 @@ impl fmt::Debug for AstarteSdk {
655674 }
656675}
657676
677+ mod utils {
678+ pub fn extract_set_properties ( bdata : & [ u8 ] ) -> Vec < String > {
679+ use flate2:: read:: ZlibDecoder ;
680+ use std:: io:: prelude:: * ;
681+
682+ let mut d = ZlibDecoder :: new ( & bdata[ 4 ..] ) ;
683+ let mut s = String :: new ( ) ;
684+ d. read_to_string ( & mut s) . unwrap ( ) ;
685+
686+ s. split ( ';' ) . map ( |x| x. to_owned ( ) ) . collect ( )
687+ }
688+ }
689+
658690#[ cfg( test) ]
659691mod test {
660692 use chrono:: { TimeZone , Utc } ;
@@ -703,4 +735,21 @@ mod test {
703735 assert ! ( interface == "com.interface.test" ) ;
704736 assert ! ( path == "/led/red" ) ;
705737 }
738+
739+ #[ test]
740+ fn test_deflate ( ) {
741+ let example = b"com.example.MyInterface/some/path;org.example.DraftInterface/otherPath" ;
742+
743+ let bdata: Vec < u8 > = vec ! [
744+ 0x00 , 0x00 , 0x00 , 0x46 , 0x78 , 0x9c , 0x4b , 0xce , 0xcf , 0xd5 , 0x4b , 0xad , 0x48 , 0xcc ,
745+ 0x2d , 0xc8 , 0x49 , 0xd5 , 0xf3 , 0xad , 0xf4 , 0xcc , 0x2b , 0x49 , 0x2d , 0x4a , 0x4b , 0x4c ,
746+ 0x4e , 0xd5 , 0x2f , 0xce , 0xcf , 0x4d , 0xd5 , 0x2f , 0x48 , 0x2c , 0xc9 , 0xb0 , 0xce , 0x2f ,
747+ 0x4a , 0x87 , 0xab , 0x70 , 0x29 , 0x4a , 0x4c , 0x2b , 0x41 , 0x28 , 0xca , 0x2f , 0xc9 , 0x48 ,
748+ 0x2d , 0x0a , 0x00 , 0x2a , 0x02 , 0x00 , 0xb2 , 0x0c , 0x1a , 0xc9 ,
749+ ] ;
750+
751+ let s = crate :: utils:: extract_set_properties ( & bdata) ;
752+
753+ assert ! ( s. join( ";" ) . as_bytes( ) == example) ;
754+ }
706755}
0 commit comments