Skip to content

Commit 02c0c2f

Browse files
authored
Merge pull request astarte-platform#439 from rgallor/refactor/timestamp
refactor(timestamp): reflect proto timestamp changes
2 parents 626626a + 2b83db7 commit 02c0c2f

File tree

4 files changed

+77
-70
lines changed

4 files changed

+77
-70
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ rustc-args = ["--cfg=docsrs"]
134134
[workspace.dependencies]
135135
astarte-device-sdk = { path = "./", version = "=0.9.6" }
136136
astarte-device-sdk-derive = { version = "=0.9.6", path = "./astarte-device-sdk-derive" }
137-
astarte-message-hub-proto = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "29c9d6f99c2c1fa004a53d51f287d7e0ba342137" }
138-
astarte-message-hub-proto-mock = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "29c9d6f99c2c1fa004a53d51f287d7e0ba342137" }
137+
astarte-message-hub-proto = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "8e8198de9589c606e63ada8ec104bace81ae9cfa" }
138+
astarte-message-hub-proto-mock = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "8e8198de9589c606e63ada8ec104bace81ae9cfa" }
139139
async-trait = "0.1.67"
140140
base64 = "0.22.0"
141141
bson = "2.7.0"

src/transport/grpc/convert.rs

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -229,38 +229,40 @@ impl TryFrom<MessageHubEvent> for ReceivedEvent<GrpcPayload> {
229229
.payload
230230
.ok_or(MessageHubProtoError::ExpectedField("payload"))?;
231231

232-
let timestamp = message.timestamp.map(convert_timestamp).transpose()?;
233-
234232
Ok(ReceivedEvent {
235233
interface: message.interface_name,
236234
path: message.path,
237-
payload: GrpcPayload::new(payload, timestamp),
235+
payload: GrpcPayload::new(payload),
238236
})
239237
}
240238
}
241239

242240
impl From<ValidatedIndividual> for astarte_message_hub_proto::AstarteMessage {
243241
fn from(value: ValidatedIndividual) -> Self {
244-
let timestamp = value.timestamp.map(|t| t.into());
242+
let timestamp = value
243+
.timestamp
244+
.map(astarte_message_hub_proto::pbjson_types::Timestamp::from);
245245

246246
let payload = Some(ProtoPayload::DatastreamIndividual(
247247
AstarteDatastreamIndividual {
248248
data: Some(value.data.into()),
249+
timestamp,
249250
},
250251
));
251252

252253
astarte_message_hub_proto::AstarteMessage {
253254
interface_name: value.interface,
254255
path: value.path,
255-
timestamp,
256256
payload,
257257
}
258258
}
259259
}
260260

261261
impl From<ValidatedObject> for astarte_message_hub_proto::AstarteMessage {
262262
fn from(value: ValidatedObject) -> Self {
263-
let timestamp = value.timestamp.map(|t| t.into());
263+
let timestamp = value
264+
.timestamp
265+
.map(astarte_message_hub_proto::pbjson_types::Timestamp::from);
264266

265267
let data = value
266268
.data
@@ -269,13 +271,12 @@ impl From<ValidatedObject> for astarte_message_hub_proto::AstarteMessage {
269271
.collect();
270272

271273
let payload = Some(ProtoPayload::DatastreamObject(
272-
astarte_message_hub_proto::AstarteDatastreamObject { data },
274+
astarte_message_hub_proto::AstarteDatastreamObject { data, timestamp },
273275
));
274276

275277
astarte_message_hub_proto::AstarteMessage {
276278
interface_name: value.interface,
277279
path: value.path,
278-
timestamp,
279280
payload,
280281
}
281282
}
@@ -286,7 +287,6 @@ impl From<ValidatedUnset> for astarte_message_hub_proto::AstarteMessage {
286287
Self {
287288
interface_name: value.interface,
288289
path: value.path,
289-
timestamp: None,
290290
payload: Some(ProtoPayload::PropertyIndividual(
291291
astarte_message_hub_proto::AstartePropertyIndividual { data: None },
292292
)),
@@ -345,16 +345,18 @@ impl TryFrom<ProtoPayload> for Value {
345345
// Individual
346346
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual {
347347
data: Some(data),
348+
timestamp: _,
348349
})
349350
| ProtoPayload::PropertyIndividual(AstartePropertyIndividual { data: Some(data) }) => {
350351
let value = data.try_into()?;
351352

352353
Ok(Value::Individual(value))
353354
}
354355
// Individual error case
355-
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual { data: None }) => {
356-
Err(MessageHubProtoError::ExpectedField("data"))
357-
}
356+
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual {
357+
data: None,
358+
timestamp: _,
359+
}) => Err(MessageHubProtoError::ExpectedField("data")),
358360
// Object
359361
ProtoPayload::DatastreamObject(object) => {
360362
let value = AstarteObject::try_from(object)?;
@@ -369,65 +371,57 @@ impl TryFrom<ProtoPayload> for Value {
369371
// because the AstarteMessage is from the proto crate, while the AstarteDeviceDataEvent is ours.
370372
impl From<DeviceEvent> for astarte_message_hub_proto::AstarteMessage {
371373
fn from(value: DeviceEvent) -> Self {
372-
let payload: ProtoPayload = value.data.into();
373-
374-
astarte_message_hub_proto::AstarteMessage {
375-
interface_name: value.interface,
376-
path: value.path,
377-
timestamp: None,
378-
payload: Some(payload),
379-
}
380-
}
381-
}
382-
383-
// TODO this is incomplete, no way of creating a PropertyIndividual with a set value
384-
// refactor value to include a property type or find a way of stashing that information
385-
// somewhere
386-
impl From<Value> for ProtoPayload {
387-
fn from(value: Value) -> Self {
388-
match value {
374+
let payload = match value.data {
389375
Value::Individual(val) => {
390376
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual {
391377
data: Some(val.into()),
378+
// FIXME: how do i retrieve the timestamp if the Value type doesn't have it?
379+
timestamp: None,
392380
})
393381
}
394382
Value::Object(val) => {
395383
let data = val.inner.into_iter().map(|(k, v)| (k, v.into())).collect();
396384

397385
ProtoPayload::DatastreamObject(astarte_message_hub_proto::AstarteDatastreamObject {
398386
data,
387+
// FIXME: how do i retrieve the timestamp if the Value type doesn't have it?
388+
timestamp: None,
399389
})
400390
}
401391
Value::Unset => ProtoPayload::PropertyIndividual(
402392
astarte_message_hub_proto::AstartePropertyIndividual { data: None },
403393
),
394+
};
395+
396+
astarte_message_hub_proto::AstarteMessage {
397+
interface_name: value.interface,
398+
path: value.path,
399+
payload: Some(payload),
404400
}
405401
}
406402
}
407403

408404
#[cfg(test)]
409405
pub(crate) mod test {
406+
use std::collections::HashMap;
407+
410408
use astarte_message_hub_proto::{
411409
AstarteData, AstarteDatastreamObject, AstarteMessage, AstartePropertyIndividual,
412410
InterfaceProperties,
413411
};
414412
use chrono::Utc;
415413
use pretty_assertions::assert_eq;
416414

417-
use crate::Timestamp;
418-
419415
use super::*;
420416

421417
pub(crate) fn new_astarte_message(
422418
interface_name: String,
423419
path: String,
424-
timestamp: Option<Timestamp>,
425420
payload: ProtoPayload,
426421
) -> AstarteMessage {
427422
AstarteMessage {
428423
interface_name,
429424
path,
430-
timestamp: timestamp.map(pbjson_types::Timestamp::from),
431425
payload: Some(payload),
432426
}
433427
}
@@ -482,12 +476,12 @@ pub(crate) mod test {
482476
let payload: ProtoPayload =
483477
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual {
484478
data: Some(astarte_type.into()),
479+
timestamp: None,
485480
});
486481

487482
let astarte_message = AstarteMessage {
488483
interface_name: interface_name.clone(),
489484
path: interface_path.clone(),
490-
timestamp: None,
491485
payload: Some(payload),
492486
};
493487

@@ -504,13 +498,12 @@ pub(crate) mod test {
504498
let interface_name = "test.name.json".to_string();
505499
let interface_path = "test".to_string();
506500

507-
let astarte_type = Value::Unset;
508-
let payload: ProtoPayload = astarte_type.into();
501+
let payload: ProtoPayload =
502+
ProtoPayload::PropertyIndividual(AstartePropertyIndividual { data: None });
509503

510504
let astarte_message = AstarteMessage {
511505
interface_name: interface_name.clone(),
512506
path: interface_path.clone(),
513-
timestamp: None,
514507
payload: Some(payload),
515508
};
516509

@@ -971,6 +964,7 @@ pub(crate) mod test {
971964
let payload: ProtoPayload =
972965
ProtoPayload::DatastreamIndividual(AstarteDatastreamIndividual {
973966
data: Some(astarte_sdk_type_double.into()),
967+
timestamp: None,
974968
});
975969

976970
let double_value = take_individual(payload)
@@ -997,13 +991,20 @@ pub(crate) mod test {
997991

998992
#[test]
999993
fn from_sdk_astarte_aggregate_to_astarte_message_payload_success() {
994+
use astarte_message_hub_proto::astarte_data::AstarteData as ProtoData;
995+
use astarte_message_hub_proto::AstarteData;
996+
1000997
let expected_data: f64 = 15.5;
1001-
let astarte_type_map = Value::Object(AstarteObject::from_iter([(
1002-
"key1".to_string(),
1003-
AstarteType::Double(expected_data),
1004-
)]));
1005998

1006-
let payload_result: ProtoPayload = astarte_type_map.into();
999+
let payload_result = ProtoPayload::DatastreamObject(AstarteDatastreamObject {
1000+
data: HashMap::from([(
1001+
"key1".to_string(),
1002+
AstarteData {
1003+
astarte_data: Some(ProtoData::Double(expected_data)),
1004+
},
1005+
)]),
1006+
timestamp: None,
1007+
});
10071008

10081009
let double_data = take_object(payload_result)
10091010
.and_then(|mut obj| obj.data.remove("key1"))

src/transport/grpc/mod.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -458,30 +458,30 @@ where
458458
mapping: &MappingRef<'_, &Interface>,
459459
payload: Self::Payload,
460460
) -> Result<Option<(AstarteType, Option<Timestamp>)>, TransportError> {
461-
let Self::Payload { data, timestamp } = payload;
461+
let value: Value = payload.data.try_into().map_err(RecvError::connection)?;
462462

463-
let value: Value = data.try_into().map_err(RecvError::connection)?;
463+
match value {
464+
Value::Individual(astarte_type) => {
465+
trace!("received {}", astarte_type.display_type());
466+
467+
// FIXME: replace None with the actual timestamp
468+
Ok(Some((astarte_type, None)))
469+
}
464470

465-
let astarte_type = match value {
466-
Value::Individual(astarte_type) => Ok(astarte_type),
467471
Value::Object(_hash_map) => {
468472
let aggr_err = AggregateError::for_payload(
469473
mapping.interface().interface_name(),
470474
mapping.path().to_string(),
471475
Aggregation::Individual,
472476
Aggregation::Object,
473477
);
474-
Err(RecvError::Aggregation(aggr_err))
478+
Err(RecvError::Aggregation(aggr_err).into())
475479
}
476480
Value::Unset => {
477481
debug!("unset received");
478-
return Ok(None);
482+
Ok(None)
479483
}
480-
}?;
481-
482-
trace!("received {}", astarte_type.display_type());
483-
484-
Ok(Some((astarte_type, timestamp)))
484+
}
485485
}
486486

487487
fn deserialize_object(
@@ -505,7 +505,8 @@ where
505505

506506
trace!("object received");
507507

508-
Ok((data, payload.timestamp))
508+
// FIXME: replace None with the actual timestamp
509+
Ok((data, None))
509510
}
510511
}
511512

@@ -551,12 +552,11 @@ where
551552
#[derive(Debug, Clone, PartialEq)]
552553
pub(crate) struct GrpcPayload {
553554
data: ProtoPayload,
554-
timestamp: Option<Timestamp>,
555555
}
556556

557557
impl GrpcPayload {
558-
pub(crate) fn new(data: ProtoPayload, timestamp: Option<Timestamp>) -> Self {
559-
Self { data, timestamp }
558+
pub(crate) fn new(data: ProtoPayload) -> Self {
559+
Self { data }
560560
}
561561
}
562562

@@ -658,7 +658,9 @@ mod test {
658658

659659
use astarte_message_hub_proto::tonic::Request;
660660
use astarte_message_hub_proto::AstarteMessage;
661-
use astarte_message_hub_proto::{pbjson_types, tonic};
661+
use astarte_message_hub_proto::{
662+
pbjson_types, tonic, AstarteDatastreamObject, AstartePropertyIndividual,
663+
};
662664
use astarte_message_hub_proto_mock::mockall::{predicate, Sequence};
663665
use itertools::Itertools;
664666
use uuid::uuid;
@@ -1127,13 +1129,17 @@ mod test {
11271129
const PATH: &str = "/1";
11281130
let interface = Interface::from_str(crate::test::E2E_SERVER_DATASTREAM).unwrap();
11291131
let interface_name = interface.interface_name().to_owned();
1130-
let expected_object = Value::Object(MockServerObject::mock_object());
1131-
let proto_payload: astarte_message_hub_proto::astarte_message::Payload =
1132-
expected_object.into();
1132+
let proto_payload = ProtoPayload::DatastreamObject(AstarteDatastreamObject {
1133+
data: MockServerObject::mock_object()
1134+
.into_key_values()
1135+
.map(|(k, v)| (k, v.into()))
1136+
.collect(),
1137+
timestamp: None,
1138+
});
1139+
11331140
let astarte_message = super::convert::test::new_astarte_message(
11341141
interface_name.clone(),
11351142
PATH.to_string(),
1136-
None,
11371143
proto_payload.clone(),
11381144
);
11391145

@@ -1178,11 +1184,11 @@ mod test {
11781184
const PATH: &str = "/1/enable";
11791185
let interface = Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap();
11801186
let interface_name = interface.interface_name().to_owned();
1181-
let proto_payload: ProtoPayload = Value::Unset.into();
1187+
let proto_payload: ProtoPayload =
1188+
ProtoPayload::PropertyIndividual(AstartePropertyIndividual { data: None });
11821189
let astarte_message = super::convert::test::new_astarte_message(
11831190
interface_name.clone(),
11841191
PATH.to_string(),
1185-
None,
11861192
proto_payload.clone(),
11871193
);
11881194

0 commit comments

Comments
 (0)