Skip to content

Commit f0c6fe5

Browse files
authored
Merge pull request #35 from shikoku1/early_connack
Move connack handling inside new()
2 parents bf7ee16 + 84fa015 commit f0c6fe5

File tree

1 file changed

+36
-7
lines changed

1 file changed

+36
-7
lines changed

src/lib.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use builder::AstarteOptions;
3434
use database::AstarteDatabase;
3535
use database::StoredProp;
3636
use itertools::Itertools;
37-
use log::{debug, error, trace};
37+
use log::{debug, error, info, trace};
3838
use rumqttc::{AsyncClient, Event};
3939
use rumqttc::{EventLoop, MqttOptions};
4040
use std::collections::HashMap;
@@ -132,7 +132,7 @@ impl AstarteSdk {
132132
// TODO: make cap configurable
133133
let (client, eventloop) = AsyncClient::new(mqtt_options.clone(), 50);
134134

135-
let device = AstarteSdk {
135+
let mut device = AstarteSdk {
136136
realm: opts.realm.to_owned(),
137137
device_id: opts.device_id.to_owned(),
138138
mqtt_options,
@@ -144,6 +144,8 @@ impl AstarteSdk {
144144

145145
device.subscribe(&cn).await?;
146146

147+
device.poll_connack().await?;
148+
147149
Ok(device)
148150
}
149151

@@ -174,6 +176,37 @@ impl AstarteSdk {
174176
Ok(())
175177
}
176178

179+
async fn poll_connack(&mut self) -> Result<(), AstarteError> {
180+
loop {
181+
// keep consuming and processing packets until we have data for the user
182+
match self.eventloop.lock().await.poll().await? {
183+
Event::Incoming(i) => {
184+
trace!("MQTT Incoming = {i:?}");
185+
186+
if let rumqttc::Packet::ConnAck(p) = i {
187+
return self.connack(p).await;
188+
} else {
189+
error!("BUG: not connack inside poll_connack {i:?}");
190+
}
191+
}
192+
Event::Outgoing(i) => {
193+
error!("BUG: not connack inside poll_connack {i:?}");
194+
}
195+
}
196+
}
197+
}
198+
199+
async fn connack(&self, p: rumqttc::ConnAck) -> Result<(), AstarteError> {
200+
if !p.session_present {
201+
self.send_introspection().await?;
202+
self.send_emptycache().await?;
203+
self.send_device_owned_properties().await?;
204+
info!("connack done");
205+
}
206+
207+
Ok(())
208+
}
209+
177210
/// Poll updates from mqtt, this is where you receive data
178211
/// ```no_run
179212
/// #[tokio::main]
@@ -198,11 +231,7 @@ impl AstarteSdk {
198231

199232
match i {
200233
rumqttc::Packet::ConnAck(p) => {
201-
if !p.session_present {
202-
self.send_introspection().await?;
203-
self.send_emptycache().await?;
204-
self.send_device_owned_properties().await?;
205-
}
234+
self.connack(p).await?;
206235
}
207236
rumqttc::Packet::Publish(p) => {
208237
let topic = parse_topic(&p.topic);

0 commit comments

Comments
 (0)