Skip to content

Commit 5d6d36e

Browse files
committed
refactor(connection): make the resend of the retention solid
Pull the packets while sending them, this is a limitation of the MQTT library. Signed-off-by: Joshua Chapman <joshua.chapman@secomind.com>
1 parent d56a954 commit 5d6d36e

File tree

15 files changed

+321
-229
lines changed

15 files changed

+321
-229
lines changed

astarte-device-sdk-derive/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ pub fn astarte_aggregate_derive(input: TokenStream) -> TokenStream {
301301
///
302302
/// ### Example
303303
///
304-
/// To derive the trait it for an individual.
304+
/// To derive the trait for an individual.
305305
///
306306
/// ```no_compile
307307
/// #[derive(FromEvent)]

examples/object_datastream/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use std::time::Duration;
2020

21+
use chrono::Utc;
2122
use serde::{Deserialize, Serialize};
2223

2324
use astarte_device_sdk::IntoAstarteObject;
@@ -84,10 +85,11 @@ async fn main() -> eyre::Result<()> {
8485

8586
println!("Sending {data:?}");
8687
client
87-
.send_object(
88+
.send_object_with_timestamp(
8889
"org.astarte-platform.rust.examples.object-datastream.DeviceDatastream",
8990
"/23",
9091
data.try_into().unwrap(),
92+
Utc::now(),
9193
)
9294
.await?;
9395

src/client/individual.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tracing::{debug, trace, warn};
2323
use crate::client::ValidatedIndividual;
2424
use crate::interface::mapping::path::MappingPath;
2525
use crate::interface::Retention;
26-
use crate::state::SharedState;
26+
use crate::state::{SharedState, Status};
2727
use crate::store::StoreCapabilities;
2828
use crate::transport::Connection;
2929
use crate::{AstarteType, Error};
@@ -51,16 +51,24 @@ where
5151

5252
debug!("sending individual type {}", validated.data.display_type());
5353

54-
if !self.state.status.is_connected() {
55-
trace!("publish individual while connection is offline");
56-
57-
return Self::offline_send_individual(
58-
&self.state,
59-
&self.store,
60-
&mut self.sender,
61-
validated,
62-
)
63-
.await;
54+
match self.state.status.connection() {
55+
Status::Connected => {
56+
trace!("publish individual while connection is online");
57+
}
58+
Status::Disconnected => {
59+
trace!("publish individual while connection is offline");
60+
61+
return Self::offline_send_individual(
62+
&self.state,
63+
&self.store,
64+
&mut self.sender,
65+
validated,
66+
)
67+
.await;
68+
}
69+
Status::Closed => {
70+
return Err(Error::Disconnected);
71+
}
6472
}
6573

6674
match mapping.retention() {

src/client/object.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::client::ValidatedObject;
2525
use crate::error::AggregationError;
2626
use crate::interface::mapping::path::MappingPath;
2727
use crate::interface::{Aggregation, Retention};
28-
use crate::state::SharedState;
28+
use crate::state::{SharedState, Status};
2929
use crate::store::StoreCapabilities;
3030
use crate::transport::Connection;
3131
use crate::Error;
@@ -66,16 +66,24 @@ where
6666

6767
debug!("sending object {}{}", interface_name, path);
6868

69-
if !self.state.status.is_connected() {
70-
trace!("publish object while connection is offline");
71-
72-
return Self::offline_send_object(
73-
&self.state,
74-
&self.store,
75-
&mut self.sender,
76-
validated,
77-
)
78-
.await;
69+
match self.state.status.connection() {
70+
Status::Connected => {
71+
trace!("publish object while connection is connected");
72+
}
73+
Status::Disconnected => {
74+
trace!("publish object while connection is offline");
75+
76+
return Self::offline_send_object(
77+
&self.state,
78+
&self.store,
79+
&mut self.sender,
80+
validated,
81+
)
82+
.await;
83+
}
84+
Status::Closed => {
85+
return Err(Error::Disconnected);
86+
}
7987
}
8088

8189
match validated.retention {

src/client/property.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use tracing::{debug, error, trace};
2323
use crate::interface::mapping::path::MappingPath;
2424
use crate::interface::reference::PropertyRef;
2525
use crate::interface::Ownership;
26+
use crate::state::Status;
2627
use crate::store::{PropertyMapping, PropertyStore, StoredProp};
2728
use crate::transport::Connection;
2829
use crate::validate::{ValidatedProperty, ValidatedUnset};
@@ -70,13 +71,21 @@ where
7071
mapping.interface().version_major()
7172
);
7273

73-
if self.state.status.is_connected() {
74-
self.sender.send_property(validated).await?;
74+
match self.state.status.connection() {
75+
Status::Connected => {
76+
self.sender.send_property(validated).await?;
7577

76-
trace!(
77-
"property sent {interface_name}{path}:{}",
78-
mapping.interface().version_major()
79-
);
78+
trace!(
79+
"property sent {interface_name}{path}:{}",
80+
mapping.interface().version_major()
81+
);
82+
}
83+
Status::Disconnected => {
84+
trace!("property not sent since offline")
85+
}
86+
Status::Closed => {
87+
return Err(Error::Disconnected);
88+
}
8089
}
8190

8291
Ok(())
@@ -147,14 +156,20 @@ where
147156
let property_mapping = (&validated).into();
148157
self.store.unset_prop(&property_mapping).await?;
149158

150-
if self.state.status.is_connected() {
151-
self.sender.unset(validated.clone()).await?;
159+
match self.state.status.connection() {
160+
Status::Connected => {
161+
self.sender.unset(validated.clone()).await?;
152162

153-
debug!("deleting property {interface_name}{path} from store",);
163+
debug!("deleting property {interface_name}{path} from store");
154164

155-
// TODO: this should be done when the package has been acknowledged, but it's hard
156-
// for the MQTT implementation at the moment so we delete it here to cleanup
157-
self.store.delete_prop(&property_mapping).await?;
165+
self.store.delete_prop(&property_mapping).await?;
166+
}
167+
Status::Disconnected => {
168+
trace!("not deleting property from store, since disconnected");
169+
}
170+
Status::Closed => {
171+
return Err(Error::Disconnected);
172+
}
158173
}
159174

160175
Ok(())

src/connection/incoming.rs

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use crate::error::{AggregationError, InterfaceTypeError};
2323
use crate::interface::mapping::path::MappingPath;
2424
use crate::interface::{Aggregation, InterfaceTypeDef};
2525
use crate::store::{PropertyMapping, PropertyStore, StoredProp};
26-
use crate::transport::{Connection, Receive, ReceivedEvent, TransportError};
27-
use crate::{DeviceEvent, Error, Interface, Value};
26+
use crate::transport::{Connection, Receive, TransportError};
27+
use crate::{Error, Interface, Value};
2828

2929
use super::DeviceConnection;
3030

@@ -33,7 +33,7 @@ where
3333
C: Connection,
3434
{
3535
#[instrument(skip(self, payload))]
36-
async fn handle_event(
36+
pub(crate) async fn handle_event(
3737
&self,
3838
interface: &str,
3939
path: &str,
@@ -205,32 +205,4 @@ where
205205

206206
Ok(Value::Object { data, timestamp })
207207
}
208-
209-
pub(super) async fn handle_connection_event(
210-
&self,
211-
event: ReceivedEvent<C::Payload>,
212-
) -> Result<(), Error>
213-
where
214-
C: Receive + Sync,
215-
{
216-
let data = match self
217-
.handle_event(&event.interface, &event.path, event.payload)
218-
.await
219-
{
220-
Ok(aggregation) => Ok(DeviceEvent {
221-
interface: event.interface,
222-
path: event.path,
223-
data: aggregation,
224-
}),
225-
Err(TransportError::Recv(recv_err)) => Err(recv_err),
226-
Err(TransportError::Transport(err)) => {
227-
return Err(err);
228-
}
229-
};
230-
231-
self.tx
232-
.send_async(data)
233-
.await
234-
.map_err(|_| Error::Disconnected)
235-
}
236208
}

src/connection/mod.rs

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ use std::future::Future;
2222
use std::sync::Arc;
2323

2424
use chrono::Utc;
25-
use tracing::{debug, info, warn};
25+
use tokio::task::JoinHandle;
26+
use tracing::{debug, info, trace, warn};
2627

27-
use crate::state::SharedState;
28+
use crate::error::Report;
29+
use crate::state::{SharedState, Status};
2830
use crate::transport::TransportError;
2931
use crate::Timestamp;
3032
use crate::{
3133
client::RecvError,
3234
event::DeviceEvent,
3335
store::wrapper::StoreWrapper,
34-
transport::{Connection, Disconnect, Publish, Receive, Reconnect, Register},
36+
transport::{Connection, Publish, Receive, Reconnect},
3537
Error,
3638
};
3739

@@ -84,6 +86,7 @@ where
8486
connection: C,
8587
sender: C::Sender,
8688
state: Arc<SharedState>,
89+
resend: Option<JoinHandle<()>>,
8790
}
8891

8992
impl<C> DeviceConnection<C>
@@ -103,6 +106,7 @@ where
103106
state,
104107
connection,
105108
sender,
109+
resend: None,
106110
}
107111
}
108112

@@ -130,19 +134,61 @@ where
130134
}),
131135
}
132136
}
137+
138+
/// Keeps polling connection events
139+
pub(super) async fn poll(&mut self) -> Result<Status, TransportError>
140+
where
141+
C: Receive + Reconnect,
142+
C::Sender: Publish + 'static,
143+
{
144+
let Some(event) = self.connection.next_event().await? else {
145+
trace!("disconnected");
146+
147+
self.state.status.set_connected(false);
148+
149+
// This will check if the connection was closed
150+
return Ok(self.state.status.connection());
151+
};
152+
153+
let event = self
154+
.handle_event(&event.interface, &event.path, event.payload)
155+
.await
156+
.map(|data| DeviceEvent {
157+
interface: event.interface,
158+
path: event.path,
159+
data,
160+
})?;
161+
162+
self.tx.send(Ok(event)).map_err(|err| {
163+
debug!(error = %Report::new(err), "disconnected");
164+
165+
TransportError::Transport(Error::Disconnected)
166+
})?;
167+
168+
Ok(self.state.status.connection())
169+
}
133170
}
134171

135172
impl<C> EventLoop for DeviceConnection<C>
136173
where
137-
C: Connection + Reconnect + Receive + Send + Sync + 'static,
138-
C::Sender: Send + Register + Publish + Disconnect + 'static,
174+
C: Connection + Reconnect + Receive + 'static,
175+
C::Sender: Publish + 'static,
139176
{
140177
async fn handle_events(mut self) -> Result<(), crate::Error> {
141178
self.init_stored_retention().await?;
142179

180+
// We are connected and all the stored packet have been sent
181+
self.state.status.set_connected(true);
182+
143183
loop {
144-
let opt = match self.connection.next_event().await {
145-
Ok(opt) => opt,
184+
match self.poll().await {
185+
Ok(Status::Connected) => {}
186+
Ok(Status::Disconnected) => {
187+
self.reconnect_and_resend().await?;
188+
}
189+
Ok(Status::Closed) => {
190+
break;
191+
}
146192
Err(TransportError::Transport(err)) => {
147193
return Err(err);
148194
}
@@ -152,45 +198,8 @@ where
152198
.send_async(Err(recv_err))
153199
.await
154200
.map_err(|_| Error::Disconnected)?;
155-
156-
continue;
157201
}
158-
};
159-
160-
let Some(event_data) = opt else {
161-
if self.state.status.is_closed() {
162-
debug!("connection closed");
163-
164-
break;
165-
}
166-
167-
debug!("reconnecting");
168-
169-
self.state.status.set_connected(false);
170-
171-
let interfaces = self.state.interfaces.read().await;
172-
173-
self.connection.reconnect(&interfaces).await?;
174-
175-
Self::resend_volatile_publishes(
176-
&self.state.volatile_store,
177-
&mut self.sender,
178-
&self.state.status,
179-
)
180-
.await?;
181-
Self::resend_stored_publishes(
182-
&mut self.store,
183-
&mut self.sender,
184-
&self.state.status,
185-
)
186-
.await?;
187-
188-
self.state.status.set_connected(true);
189-
190-
continue;
191-
};
192-
193-
self.handle_connection_event(event_data).await?
202+
}
194203
}
195204

196205
info!("connection closed successfully");

0 commit comments

Comments
 (0)