Skip to content

Commit 0737ee4

Browse files
authored
Merge pull request #29 from shikoku1/purgeprops
Add purge properties logic
2 parents d0cad9a + 473f2ad commit 0737ee4

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ async-trait = "0.1.51"
3636
uuid = {version="0.8.2", features = ["v5", "v4"] }
3737
base64 = "0.13.0"
3838
webpki = "0.21.4"
39+
flate2 = "1.0"
3940

4041
[dev-dependencies]
4142
structopt = "0.3"

src/lib.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,13 @@ impl AstarteSdk {
208208
let topic = parse_topic(&p.topic);
209209

210210
if let Some((_, _, interface, path)) = topic {
211+
let bdata = p.payload.to_vec();
212+
211213
if interface == "control" && path == "/consumer/properties" {
212-
// TODO: implement consumer purge properties
214+
self.purge_properties(bdata).await?;
213215
continue;
214216
}
215217

216-
let bdata = p.payload.to_vec();
217-
218218
debug!("Incoming publish = {} {:?}", p.topic, bdata);
219219

220220
if let Some(database) = &self.database {
@@ -280,6 +280,25 @@ impl AstarteSdk {
280280
format!("{}/{}", self.realm, self.device_id)
281281
}
282282

283+
async fn purge_properties(&self, bdata: Vec<u8>) -> Result<(), AstarteError> {
284+
if let Some(db) = &self.database {
285+
let stored_props = db.load_all_props().await?;
286+
287+
let paths = utils::extract_set_properties(&bdata);
288+
289+
for stored_prop in stored_props {
290+
if paths.contains(&(stored_prop.interface.clone() + &stored_prop.path)) {
291+
continue;
292+
}
293+
294+
db.delete_prop(&stored_prop.interface, &stored_prop.path)
295+
.await?;
296+
}
297+
}
298+
299+
Ok(())
300+
}
301+
283302
async fn send_emptycache(&self) -> Result<(), AstarteError> {
284303
let url = self.client_id() + "/control/emptyCache";
285304
debug!("sending emptyCache to {}", url);
@@ -709,6 +728,19 @@ impl fmt::Debug for AstarteSdk {
709728
}
710729
}
711730

731+
mod utils {
732+
pub fn extract_set_properties(bdata: &[u8]) -> Vec<String> {
733+
use flate2::read::ZlibDecoder;
734+
use std::io::prelude::*;
735+
736+
let mut d = ZlibDecoder::new(&bdata[4..]);
737+
let mut s = String::new();
738+
d.read_to_string(&mut s).unwrap();
739+
740+
s.split(';').map(|x| x.to_owned()).collect()
741+
}
742+
}
743+
712744
#[cfg(test)]
713745
mod test {
714746
use chrono::{TimeZone, Utc};
@@ -757,4 +789,21 @@ mod test {
757789
assert!(interface == "com.interface.test");
758790
assert!(path == "/led/red");
759791
}
792+
793+
#[test]
794+
fn test_deflate() {
795+
let example = b"com.example.MyInterface/some/path;org.example.DraftInterface/otherPath";
796+
797+
let bdata: Vec<u8> = vec![
798+
0x00, 0x00, 0x00, 0x46, 0x78, 0x9c, 0x4b, 0xce, 0xcf, 0xd5, 0x4b, 0xad, 0x48, 0xcc,
799+
0x2d, 0xc8, 0x49, 0xd5, 0xf3, 0xad, 0xf4, 0xcc, 0x2b, 0x49, 0x2d, 0x4a, 0x4b, 0x4c,
800+
0x4e, 0xd5, 0x2f, 0xce, 0xcf, 0x4d, 0xd5, 0x2f, 0x48, 0x2c, 0xc9, 0xb0, 0xce, 0x2f,
801+
0x4a, 0x87, 0xab, 0x70, 0x29, 0x4a, 0x4c, 0x2b, 0x41, 0x28, 0xca, 0x2f, 0xc9, 0x48,
802+
0x2d, 0x0a, 0x00, 0x2a, 0x02, 0x00, 0xb2, 0x0c, 0x1a, 0xc9,
803+
];
804+
805+
let s = crate::utils::extract_set_properties(&bdata);
806+
807+
assert!(s.join(";").as_bytes() == example);
808+
}
760809
}

0 commit comments

Comments
 (0)