diff --git a/Cargo.lock b/Cargo.lock index 0e478c943..8329f3cd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,10 +177,12 @@ version = "0.9.6" dependencies = [ "astarte-device-sdk-derive", "astarte-message-hub-proto", + "astarte-message-hub-proto-mock", "async-trait", "base64 0.22.1", "bson", "bytes", + "cfg-if", "chrono", "color-eyre", "env_logger", @@ -190,7 +192,7 @@ dependencies = [ "http", "itertools 0.11.0", "litemap", - "mockall", + "mockall 0.12.1", "mockito", "pretty_assertions", "rand_core 0.6.4", @@ -232,13 +234,13 @@ version = "0.9.6" dependencies = [ "astarte-device-sdk", "chrono", - "mockall", + "mockall 0.12.1", ] [[package]] name = "astarte-message-hub-proto" version = "0.7.0" -source = "git+https://github.com/astarte-platform/astarte-message-hub-proto?rev=27f386114528ceeeb81440879f259146fbed6052#27f386114528ceeeb81440879f259146fbed6052" +source = "git+https://github.com/astarte-platform/astarte-message-hub-proto?rev=20ba40215172613bfab149676adf0dbd9f3ccc92#20ba40215172613bfab149676adf0dbd9f3ccc92" dependencies = [ "chrono", "pbjson-types", @@ -250,6 +252,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "astarte-message-hub-proto-mock" +version = "0.7.0" +source = "git+https://github.com/astarte-platform/astarte-message-hub-proto?rev=20ba40215172613bfab149676adf0dbd9f3ccc92#20ba40215172613bfab149676adf0dbd9f3ccc92" +dependencies = [ + "astarte-message-hub-proto", + "mockall 0.13.1", + "pbjson-types", + "tonic", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1573,7 +1586,21 @@ dependencies = [ "downcast", "fragile", "lazy_static", - "mockall_derive", + "mockall_derive 0.12.1", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive 0.13.1", "predicates", "predicates-tree", ] @@ -1590,6 +1617,18 @@ dependencies = [ "syn", ] +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "mockito" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 75b93b5f1..cffd49631 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ astarte-message-hub-proto = { workspace = true, optional = true } base64 = { workspace = true } bson = { workspace = true, features = ["chrono-0_4"] } bytes = { workspace = true } +cfg-if = { workspace = true } chrono = { workspace = true, features = ["serde"] } flate2 = { workspace = true } flume = { workspace = true, features = ["async"] } @@ -98,6 +99,8 @@ reqwest = { workspace = true, features = ["macos-system-configuration"] } [dev-dependencies] astarte-device-sdk-derive = { workspace = true } +astarte-message-hub-proto = { workspace = true } +astarte-message-hub-proto-mock = { workspace = true } async-trait = { workspace = true } color-eyre = { workspace = true } env_logger = { workspace = true } @@ -131,11 +134,13 @@ rustc-args = ["--cfg=docsrs"] [workspace.dependencies] astarte-device-sdk = { path = "./", version = "=0.9.6" } astarte-device-sdk-derive = { version = "=0.9.6", path = "./astarte-device-sdk-derive" } -astarte-message-hub-proto = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "27f386114528ceeeb81440879f259146fbed6052" } +astarte-message-hub-proto = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "20ba40215172613bfab149676adf0dbd9f3ccc92" } +astarte-message-hub-proto-mock = { git = "https://github.com/astarte-platform/astarte-message-hub-proto", rev = "20ba40215172613bfab149676adf0dbd9f3ccc92" } async-trait = "0.1.67" base64 = "0.22.0" bson = "2.7.0" bytes = "1.5.0" +cfg-if = "1.0.0" chrono = "0.4.20" color-eyre = "0.6.3" env_logger = "0.11.0" diff --git a/src/lib.rs b/src/lib.rs index 0308a3361..5c4e9c435 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,6 +108,9 @@ mod test { pub(crate) const E2E_DEVICE_DATASTREAM: &str = include_str!( "../e2e-test/interfaces/org.astarte-platform.rust.e2etest.DeviceDatastream.json" ); + pub(crate) const E2E_SERVER_DATASTREAM: &str = include_str!( + "../e2e-test/interfaces/org.astarte-platform.rust.e2etest.ServerAggregate.json" + ); pub(crate) const E2E_DEVICE_AGGREGATE: &str = include_str!( "../e2e-test/interfaces/org.astarte-platform.rust.e2etest.DeviceAggregate.json" ); diff --git a/src/transport/grpc/convert.rs b/src/transport/grpc/convert.rs index 56d983180..f0ff14f88 100644 --- a/src/transport/grpc/convert.rs +++ b/src/transport/grpc/convert.rs @@ -415,6 +415,9 @@ impl From for astarte_message_hub_proto::AstarteMessage { } } +// TODO this is incomplete, no way of creating a PropertyIndividual with a set value +// refactor value to include a property type or find a way of stashing that information +// somewhere impl From for ProtoPayload { fn from(value: Value) -> Self { match value { @@ -463,6 +466,16 @@ pub(crate) mod test { } } + pub(crate) fn new_property( + path: String, + data: Option, + ) -> astarte_message_hub_proto::Property { + astarte_message_hub_proto::Property { + path, + data: data.map(|s| s.into()), + } + } + #[test] fn proto_astarte_double_into_astarte_device_sdk_type_success() { let value = 15.5; diff --git a/src/transport/grpc/mod.rs b/src/transport/grpc/mod.rs index 2f991af0b..4968e9dd5 100644 --- a/src/transport/grpc/mod.rs +++ b/src/transport/grpc/mod.rs @@ -29,16 +29,14 @@ use std::ops::Deref; use std::time::Duration; use astarte_message_hub_proto::prost::{DecodeError, Message}; -use astarte_message_hub_proto::tonic::codec::Streaming; use astarte_message_hub_proto::tonic::codegen::InterceptedService; use astarte_message_hub_proto::tonic::metadata::MetadataValue; use astarte_message_hub_proto::tonic::service::Interceptor; use astarte_message_hub_proto::tonic::transport::{Channel, Endpoint}; use astarte_message_hub_proto::tonic::{Request, Status}; use astarte_message_hub_proto::{ - astarte_message::Payload as ProtoPayload, message_hub_client::MessageHubClient, - pbjson_types::Empty, AstarteMessage, InterfacesJson, InterfacesName, MessageHubError, - MessageHubEvent, Node, + astarte_message::Payload as ProtoPayload, pbjson_types::Empty, AstarteMessage, InterfacesJson, + InterfacesName, MessageHubError, MessageHubEvent, Node, }; use bytes::Bytes; use sync_wrapper::SyncWrapper; @@ -118,6 +116,18 @@ impl From for GrpcError { } } +cfg_if::cfg_if! { + if #[cfg(test)] { + type MessageHubClient = astarte_message_hub_proto_mock::MockMessageHubClient; + type Streaming = astarte_message_hub_proto_mock::MockStreaming; + } + else { + type MessageHubClient = + astarte_message_hub_proto::message_hub_client::MessageHubClient; + type Streaming = astarte_message_hub_proto::tonic::codec::Streaming; + } +} + type MsgHubClient = MessageHubClient>; #[derive(Debug, Clone, Copy)] @@ -593,9 +603,7 @@ where } = config; let channel = self.endpoint.connect().await.map_err(GrpcError::from)?; - let node_id_interceptor = NodeIdInterceptor::new(self.uuid); - let mut client = MessageHubClient::with_interceptor(channel, node_id_interceptor); let node_data = NodeData::try_from(interfaces)?; @@ -646,788 +654,566 @@ impl<'a> TryFrom<&'a Interfaces> for NodeData { #[cfg(test)] mod test { - use std::{future::Future, net::SocketAddr, str::FromStr}; - - use astarte_message_hub_proto::{ - message_hub_server::{MessageHub, MessageHubServer}, - AstarteMessage, Property, PropertyIdentifier, StoredProperties, StoredPropertiesFilter, - }; - use async_trait::async_trait; - use tokio::{ - net::TcpListener, - sync::{mpsc, Mutex}, - }; + use std::str::FromStr; + + use astarte_message_hub_proto::tonic::Request; + use astarte_message_hub_proto::AstarteMessage; + use astarte_message_hub_proto::{pbjson_types, tonic}; + use astarte_message_hub_proto_mock::mockall::{predicate, Sequence}; + use itertools::Itertools; use uuid::uuid; - use crate::{ - aggregate::AstarteObject, - builder::DEFAULT_VOLATILE_CAPACITY, - store::memory::MemoryStore, - transport::{ - test::{mock_validate_individual, mock_validate_object}, - ReceivedEvent, - }, - DeviceEvent, Value, - }; + use crate::{aggregate::AstarteObject, builder::DEFAULT_VOLATILE_CAPACITY, DeviceEvent, Value}; use super::*; pub(crate) const ID: Uuid = uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"); - #[derive(Debug)] - pub(crate) enum ServerReceivedRequest { - Attach((Uuid, Node)), - Send(AstarteMessage), - Detach(Empty), - AddInterfaces(InterfacesJson), - RemoveInterfaces(InterfacesName), - GetProperties(InterfacesName), - GetAllProperties(StoredPropertiesFilter), - GetProperty(PropertyIdentifier), - } - - type ServerSenderValuesVec = Vec>; - - fn get_node_id_from_metadata( - request: &tonic::Request, - ) -> Result> { - // check only node-id-bin since the Node does not insert node-id metadata in the request - let Some(metadata_val) = request.metadata().get_bin("node-id-bin") else { - return Err(Box::new(Status::new( - tonic::Code::InvalidArgument, - "absent node id into metadata".to_string(), - ))); - }; - - let node_id_bytes = metadata_val - .to_bytes() - .map_err(|e| Status::new(tonic::Code::InvalidArgument, e.to_string()))?; + struct MockDeviceObject {} - Uuid::from_slice(&node_id_bytes) - .map_err(|e| Box::new(Status::new(tonic::Code::InvalidArgument, e.to_string()))) - } - - pub(crate) struct TestMessageHubServer { - /// This stream can be used to send test events that will be handled by the astarte device sdk code - /// and by the Grpc client. - /// Each received elements is a "session": the [`Vec`] received contains messages that will be sent - /// after the client attaches to the server. - /// Every successive [`Vec`] is only returned if the client reattaches to the server. - server_send: Mutex>, - /// This stream contains requests received by the server - server_received: mpsc::Sender, - } - - impl TestMessageHubServer { - fn new( - server_send: mpsc::Receiver>>, - server_received: mpsc::Sender, - ) -> Self { - Self { - server_send: Mutex::new(server_send), - server_received, - } + impl MockDeviceObject { + fn mock_object() -> AstarteObject { + AstarteObject::from_iter([ + ("endpoint1".to_string(), AstarteType::Double(4.2)), + ( + "endpoint2".to_string(), + AstarteType::String("obj".to_string()), + ), + ( + "endpoint3".to_string(), + AstarteType::BooleanArray(vec![true, false, true]), + ), + ]) } } - #[async_trait] - impl MessageHub for TestMessageHubServer { - type AttachStream = - futures::stream::Iter>>; - - async fn attach( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - // retrieve the node id from the metadata request - let node_id = get_node_id_from_metadata(&request).map_err(|s| *s)?; - - let inner = request.into_inner(); - println!("Client '{node_id}' attached"); - - self.server_received.send(ServerReceivedRequest::Attach((node_id, inner))).await - .expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - let mut receiver_lock = self.server_send.lock().await; - - let response_vec = receiver_lock.recv().await.unwrap(); + struct MockServerObject {} - Ok(tonic::Response::new(futures::stream::iter(response_vec))) - } - - async fn send( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.server_received.send(ServerReceivedRequest::Send(request.into_inner())).await - .expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - Ok(tonic::Response::new(Empty::default())) - } - - async fn detach( - &self, - _request: tonic::Request, - ) -> Result, tonic::Status> { - println!("Client detached"); - - self.server_received.send(ServerReceivedRequest::Detach(Empty{})).await - .expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - Ok(tonic::Response::new(Empty::default())) + impl MockServerObject { + fn mock_object() -> AstarteObject { + AstarteObject::from_iter([ + ("double_endpoint".to_string(), AstarteType::Double(4.2)), + ( + "string_endpoint".to_string(), + AstarteType::String("obj".to_string()), + ), + ( + "boleanarray_endpoint".to_string(), + AstarteType::BooleanArray(vec![true, false, true]), + ), + ]) } + } - async fn add_interfaces( + pub(crate) trait InterfaceRequestUtils { + fn match_interfaces( &self, - request: tonic::Request, - ) -> Result, Status> { - self.server_received.send(ServerReceivedRequest::AddInterfaces(request.into_inner())).await.expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - Ok(tonic::Response::new(Empty::default())) - } + interfaces: &[Interface], + ) -> Result>; + } - async fn remove_interfaces( + impl InterfaceRequestUtils for Request { + fn match_interfaces( &self, - request: tonic::Request, - ) -> Result, Status> { - self.server_received.send(ServerReceivedRequest::RemoveInterfaces(request.into_inner())).await.expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - Ok(tonic::Response::new(Empty::default())) + interfaces: &[Interface], + ) -> Result> { + let mut request_interfaces = self.get_ref().interfaces_json.clone(); + request_interfaces.sort_unstable(); + + let mut expected_interfaces = interfaces + .iter() + .map(serde_json::to_string) + .collect::, _>>()?; + expected_interfaces.sort_unstable(); + + Ok(request_interfaces == expected_interfaces) } + } - async fn get_properties( + impl InterfaceRequestUtils for Request { + fn match_interfaces( &self, - request: Request, - ) -> Result, Status> { - self.server_received - .send(ServerReceivedRequest::GetProperties(request.into_inner())) - .await.expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - // return no properties - Ok(tonic::Response::new(StoredProperties { - interface_properties: HashMap::new(), - })) - } + interfaces: &[Interface], + ) -> Result> { + let mut request_interfaces = self.get_ref().names.clone(); + request_interfaces.sort_unstable(); - async fn get_all_properties( - &self, - request: Request, - ) -> Result, Status> { - self.server_received - .send(ServerReceivedRequest::GetAllProperties( - request.into_inner(), - )) - .await.expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - // return no properties - Ok(tonic::Response::new(StoredProperties { - interface_properties: HashMap::new(), - })) - } + let mut expected_interfaces = + interfaces.iter().map(|i| i.interface_name()).collect_vec(); + expected_interfaces.sort_unstable(); - async fn get_property( - &self, - request: Request, - ) -> Result, Status> { - self.server_received - .send(ServerReceivedRequest::GetProperty(request.into_inner())) - .await.expect("Could not send notification of a server received message, connect a channel to the Receiver"); - - // return no properties - Ok(tonic::Response::new(Property { - path: "".to_owned(), - data: None, - })) + Ok(request_interfaces == expected_interfaces) } } - fn make_server( - sock: TcpListener, - server: TestMessageHubServer, - ) -> Result, Box> { - Ok(async move { - tonic::transport::Server::builder() - .add_service(MessageHubServer::new(server)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(sock)) - .await - .unwrap(); - }) - } - - async fn make_client( - addr: SocketAddr, - interceptor: NodeIdInterceptor, - ) -> impl Future { - async move { - let channel = loop { - let channel_res = tonic::transport::Endpoint::try_from(format!("http://{}", addr)) - .unwrap() - .connect() - .await; + impl InterfaceRequestUtils for InterfacesName { + fn match_interfaces( + &self, + interfaces: &[Interface], + ) -> Result> { + let mut request_interfaces = self.names.clone(); + request_interfaces.sort_unstable(); - match channel_res { - Ok(channel) => break channel, - Err(err) => println!("Failed attempt of connecting with error: {}", err), - } - }; + let mut expected_interfaces = + interfaces.iter().map(|i| i.interface_name()).collect_vec(); + expected_interfaces.sort_unstable(); - MessageHubClient::with_interceptor(channel, interceptor) + Ok(request_interfaces == expected_interfaces) } } - pub(crate) async fn mock_grpc_actors( - server_impl: TestMessageHubServer, - ) -> Result< - (impl Future, impl Future), - Box, - > { - // bind to port 0 to make the kernel choose an open port - let socket = tokio::net::TcpListener::bind("127.0.0.1:0").await?; - - let addr = socket.local_addr()?; - - let server = make_server(socket, server_impl)?; - - let interceptor = NodeIdInterceptor::new(ID); - - let client = make_client(addr, interceptor).await; - - Ok((server, client)) - } - - pub(crate) async fn mock_astarte_grpc_client( - mut message_hub_client: MsgHubClient, + async fn mock_grpc( + message_hub_client_tx: MsgHubClient, + mut message_hub_client_rx: MsgHubClient, interfaces: &Interfaces, store: S, ) -> Result<(GrpcClient, Grpc), Box> where S: PropertyStore, { - let node_data = NodeData::try_from(interfaces)?; - let stream = Grpc::::attach(&mut message_hub_client, node_data).await?; + let store = StoreWrapper::new(store); + let volatile = SharedVolatileStore::with_capacity(DEFAULT_VOLATILE_CAPACITY); + let client = GrpcClient::new(message_hub_client_tx, store, volatile); - let client = GrpcClient::new( - message_hub_client.clone(), - StoreWrapper::new(store), - SharedVolatileStore::with_capacity(DEFAULT_VOLATILE_CAPACITY), - ); - let grcp = Grpc::new(ID, message_hub_client, stream); + let node_data = NodeData::try_from(interfaces)?; + let stream = Grpc::::attach(&mut message_hub_client_rx, node_data).await?; + let connection = Grpc::new(ID, message_hub_client_rx, stream); - Ok((client, grcp)) + Ok((client, connection)) } - pub(crate) struct TestServerChannels { - pub(crate) server_response_sender: - mpsc::Sender>>, - pub(crate) server_request_receiver: mpsc::Receiver, - } + fn mock_stream(v: I) -> Streaming + where + I: IntoIterator, Status>>, + T: Send + Clone, + { + let mut streaming_server_response = Streaming::new(); - pub(crate) fn build_test_message_hub_server() -> (TestMessageHubServer, TestServerChannels) { - // Holds the stream of messages that will follow an attach, the server stores the receiver - // and relays messages to the stream got by the client that called `attach` - let server_response_channel = mpsc::channel(10); - - // This channel holds requests that arrived to the server and can be used to verify that the - // requests received are correct, the server will store the transmitting end of the channel - // to send events when a new request is received - let server_request_channel = mpsc::channel(10); - - ( - TestMessageHubServer::new(server_response_channel.1, server_request_channel.0), - TestServerChannels { - server_response_sender: server_response_channel.0, - server_request_receiver: server_request_channel.1, - }, - ) - } + v.into_iter().for_each(|resp| { + streaming_server_response + .expect_message() + .return_once(move || resp); + }); - macro_rules! expect_messages { - ($poll_result_fn:expr; $($pattern:pat $($(=> $var:ident = $expr_value:expr;)? $(if $guard:expr)?),*),+) => {{ - let mut i = 0usize; - - $( - // One based indexing - i += 1; - - match $poll_result_fn { - Result::Ok(v) => { - match v { - $pattern => { - $( - $( - let $var = $expr_value; - )? - - $(if !($guard) { - panic!("The message n.{} didn't pass the guard '{}'", i, stringify!($guard)); - })? - )* - - println!("Matched message n.{}", i); - }, - // Depending on the user declaration this pattern could be unreachable and this is fine - #[allow(unreachable_patterns)] - actual => panic!("Expected message n.{} to be matching the pattern '{}' but got '{:?}'", i, stringify!($pattern), actual), - } - } - Result::Err(e) => { - panic!("Expected message n.{} with pattern '{}' but the `{}` returned an `Err` {:?}", i, stringify!($pattern), stringify!($poll_result_fn), e); - } - } - )+ - }}; + streaming_server_response } - pub(crate) use expect_messages; - #[tokio::test] async fn test_attach_detach() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - // no messages are read as responses by the server so we pass an empty vec - channels.server_response_sender.send(vec![]).await.unwrap(); - - let client_operations = async move { - let client = client_future.await; - // When the grpc connection gets created the attach methods is called - let (mut client, _connection) = - mock_astarte_grpc_client(client, &Interfaces::new(), MemoryStore::new()) - .await - .unwrap(); - - // manually calling detach - client.disconnect().await.unwrap(); - }; - - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } + let mut seq = Sequence::new(); + // no expectations for the store + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mut mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + // When the grpc connection gets created the attach methods is called + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i| { + // no messages are read as responses by the server so we pass an empty vec + Ok(tonic::Response::new(mock_stream([]))) + }); + // when disconnect is called detach gets called internally + mock_client_tx + .expect_detach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + + let (mut client, _connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); - expect_messages!(channels.server_request_receiver.try_recv(); - ServerReceivedRequest::Attach((id, _)) if id == ID, - ServerReceivedRequest::Detach(_), - ); + client.disconnect().await.unwrap(); } #[tokio::test] async fn test_server_error() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); + let mut seq = Sequence::new(); + // no expectations for the store + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mut mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + // 2 attach and 2 error returned + mock_client_rx + .expect_attach::>() + .times(2) + .in_sequence(&mut seq) + .returning(|_i| { + Ok(tonic::Response::new(mock_stream( + // send an Err response as the first message + [Err(tonic::Status::unknown("Test unknown reattach"))], + ))) + }); + // attach no responses + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i| Ok(tonic::Response::new(mock_stream([])))); + // expect detach + mock_client_tx + .expect_detach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + + // first attach is called when the connection is created + let (mut client, mut connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); + // poll the next message (error) + assert!(matches!(connection.next_event().await, Ok(None))); + // reconnect (second attach) + assert!(matches!( + connection.reconnect(&Interfaces::new()).await, + Ok(()) + )); + // poll the next message (second error) + assert!(matches!(connection.next_event().await, Ok(None))); + // after the second error we reconnect with no messages + assert!(matches!( + connection.reconnect(&Interfaces::new()).await, + Ok(()) + )); + + // manually calling detach + client.disconnect().await.unwrap(); + } - let err = tonic::Status::unknown("Test unknown reattach"); - println!("{:?} eq {}", err, err.code() == tonic::Code::Unknown); + #[tokio::test] + async fn test_add_remove_interface() { + let mut seq = Sequence::new(); + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mut mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + // When the grpc connection gets created the attach methods is called + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i| { + // no messages are read as responses by the server so we pass an empty vec + Ok(tonic::Response::new(mock_stream(vec![]))) + }); + mock_client_tx + .expect_add_interfaces::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &Request<_>| { + r.match_interfaces(&[Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap()]) + .unwrap() + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + mock_client_tx + .expect_remove_interfaces::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &Request<_>| { + r.match_interfaces(&[Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap()]) + .unwrap() + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + mock_client_tx + .expect_add_interfaces::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &Request<_>| { + r.match_interfaces(&[ + Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(), + Interface::from_str(crate::test::E2E_DEVICE_PROPERTY).unwrap(), + ]) + .unwrap() + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + mock_client_tx + .expect_remove_interfaces::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &Request<_>| { + r.match_interfaces(&[ + Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(), + Interface::from_str(crate::test::E2E_DEVICE_PROPERTY).unwrap(), + ]) + .unwrap() + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + // when disconnect is called detach is called + mock_client_tx + .expect_detach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); + + let (mut client, _connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); - // send first error which causes a reattach - channels - .server_response_sender - .send(vec![Err(tonic::Status::unknown("Test unknown reattach"))]) + let interfaces = Interfaces::new(); + let interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); + let validated = interfaces.validate(interface.clone()).unwrap().unwrap(); + client.add_interface(&interfaces, &validated).await.unwrap(); + client + .remove_interface(&interfaces, &validated) .await .unwrap(); - // send second error which causes a reattach - channels - .server_response_sender - .send(vec![Err(tonic::Status::unavailable( - "Test unavailable reattach", - ))]) + let additional_interface: Interface = + Interface::from_str(crate::test::E2E_DEVICE_PROPERTY).unwrap(); + let list_to_add = Interfaces::new() + .validate_many([interface.clone(), additional_interface.clone()]) + .unwrap(); + client + .extend_interfaces(&interfaces, &list_to_add) .await .unwrap(); - // no reattach - channels - .server_response_sender - .send(vec![Err(tonic::Status::not_found("Test no reattach"))]) + let to_remove = HashMap::from([ + (interface.interface_name(), &interface), + (additional_interface.interface_name(), &additional_interface), + ]); + client + .remove_interfaces(&interfaces, &to_remove) .await .unwrap(); - let client_operations = async move { - let client = client_future.await; - // When the grpc connection gets created the attach methods is called - let (mut client, mut connection) = - mock_astarte_grpc_client(client, &Interfaces::new(), MemoryStore::new()) - .await - .unwrap(); - - // poll the three messages the first two received errors will simply reconnect without returning - assert!(matches!(connection.next_event().await, Ok(None))); - - // to reconnect but errors - assert!(matches!( - connection.reconnect(&Interfaces::new()).await, - Ok(()) - )); - - // second error - assert!(matches!(connection.next_event().await, Ok(None))); - - assert!(matches!( - connection.reconnect(&Interfaces::new()).await, - Ok(()) - )); - - // third error - assert!(matches!(connection.next_event().await, Ok(None))); - - // manually calling detach - client.disconnect().await.unwrap(); - }; - - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } - - expect_messages!(channels.server_request_receiver.try_recv(); - // connection creation attach - ServerReceivedRequest::Attach((id, _)) if id == ID, - // first error attach - ServerReceivedRequest::Attach((id, _)) if id == ID, - // second error attach - ServerReceivedRequest::Attach((id, _)) if id == ID, - ServerReceivedRequest::Detach(_), - ); + // manually calling detach + client.disconnect().await.unwrap(); } #[tokio::test] - async fn test_add_remove_interface() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - channels.server_response_sender.send(vec![]).await.unwrap(); - - let client_operations = async move { - let client = client_future.await; - // When the grpc connection gets created the attach methods is called - let (mut client, _connection) = - mock_astarte_grpc_client(client, &Interfaces::new(), MemoryStore::new()) - .await - .unwrap(); - - let interfaces = Interfaces::new(); - - let interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); - let validated = interfaces.validate(interface.clone()).unwrap().unwrap(); - - client.add_interface(&interfaces, &validated).await.unwrap(); - - client - .remove_interface(&interfaces, &validated) - .await - .unwrap(); - - let additional_interface = - Interface::from_str(crate::test::E2E_DEVICE_PROPERTY).unwrap(); - let to_add = Interfaces::new() - .validate_many([interface.clone(), additional_interface.clone()]) - .unwrap(); + async fn test_send_individual() { + let mut seq = Sequence::new(); + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mut mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); - client - .extend_interfaces(&interfaces, &to_add) - .await - .unwrap(); + const PATH: &str = "/1/name"; + const STRING_VALUE: &str = "value"; + let interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); + let interface_name = interface.interface_name().to_owned(); + + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i| { + // no messages are read as responses by the server so we pass an empty vec + Ok(tonic::Response::new(mock_stream(vec![]))) + }); + + let interface_name_cl = interface_name.clone(); + mock_client_tx + .expect_send::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(move |r: &Request| { + DeviceEvent::try_from(r.get_ref().clone()).is_ok_and(|e| { + e.interface == interface_name_cl + && e.path == PATH + && matches!(e.data, Value::Individual(AstarteType::String(v)) + if v == STRING_VALUE) + }) + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); - let to_remove = HashMap::from([ - (interface.interface_name(), &interface), - (additional_interface.interface_name(), &additional_interface), - ]); + mock_client_tx + .expect_detach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); - client - .remove_interfaces(&interfaces, &to_remove) + let (mut client, _connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) .await .unwrap(); - // manually calling detach - client.disconnect().await.unwrap(); - }; - - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } - - let interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); - let additional_interface = Interface::from_str(crate::test::E2E_DEVICE_PROPERTY).unwrap(); - - let mut expect_added = [&additional_interface, &interface] - .map(|i| serde_json::to_string(i).unwrap()) - .to_vec(); - expect_added.sort(); - - let mut expect_removed = vec![ - additional_interface.interface_name().to_string(), - interface.interface_name().to_string(), - ]; - expect_removed.sort(); - - expect_messages!(channels.server_request_receiver.try_recv(); - // connection creation attach - ServerReceivedRequest::Attach((id, _)) if id == ID, - // add interface - ServerReceivedRequest::AddInterfaces(i) if i.interfaces_json == vec![serde_json::to_string(&interface).unwrap()], - // remove interface - ServerReceivedRequest::RemoveInterfaces(i) if i.names == vec![interface.interface_name().to_string()], - // add more interfaces - ServerReceivedRequest::AddInterfaces(mut i) - => ordered = {i.interfaces_json.sort(); i.interfaces_json} ; - if ordered == expect_added, - // remove more interfaces - ServerReceivedRequest::RemoveInterfaces(mut i) - => ordered = {i.names.sort(); i.names} ; - if ordered == expect_removed, - // detach - ServerReceivedRequest::Detach(Empty {}), - ); - } - - #[tokio::test] - async fn test_send_individual() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - const INTERFACE_NAME: &str = - "org.astarte-platform.rust.examples.individual-properties.DeviceProperties"; - const STRING_VALUE: &str = "value"; - - // no messages are read as responses by the server so we pass an empty vec - channels.server_response_sender.send(vec![]).await.unwrap(); - - let client_operations = async move { - let client = client_future.await; - - let path = MappingPath::try_from("/1/name").unwrap(); - let interfaces = - Interfaces::from_iter([ - Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap() - ]); - let mapping_ref = interfaces.interface_mapping(INTERFACE_NAME, &path).unwrap(); - - let (mut client, _connection) = - mock_astarte_grpc_client(client, &interfaces, MemoryStore::new()) - .await - .unwrap(); - - let validated_individual = mock_validate_individual( - mapping_ref, - &path, - AstarteType::String(STRING_VALUE.to_string()), - None, - ) + let path = MappingPath::try_from(PATH).unwrap(); + let interfaces = Interfaces::from_iter([interface]); + let mapping_ref = interfaces + .interface_mapping(&interface_name, &path) .unwrap(); - - client.send_individual(validated_individual).await.unwrap(); - - client.disconnect().await.unwrap(); - }; - - // Poll client and server future - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } - - expect_messages!(channels.server_request_receiver.try_recv(); - ServerReceivedRequest::Attach((id, _)) if id == ID, - ServerReceivedRequest::Send(m) - => data_event = DeviceEvent::try_from(m).expect("Malformed message"); - if data_event.interface == "org.astarte-platform.rust.examples.individual-properties.DeviceProperties" - && data_event.path == "/1/name" - && matches!(data_event.data, Value::Individual(AstarteType::String(v)) if v == STRING_VALUE), - ServerReceivedRequest::Detach(_), - ); - } - - fn mock_astarte_object() -> AstarteObject { - let mut obj = AstarteObject::new(); - obj.insert("endpoint1".to_string(), AstarteType::Double(4.2)); - obj.insert( - "endpoint2".to_string(), - AstarteType::String("obj".to_string()), - ); - obj.insert( - "endpoint3".to_string(), - AstarteType::BooleanArray(vec![true]), - ); - - obj + let validated = ValidatedIndividual::validate( + mapping_ref, + &path, + AstarteType::String(STRING_VALUE.to_string()), + None, + ) + .unwrap(); + client.send_individual(validated).await.unwrap(); + client.disconnect().await.unwrap(); } #[tokio::test] async fn test_send_object_timestamp() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - // no messages are read as responses by the server so we pass an empty vec - channels.server_response_sender.send(vec![]).await.unwrap(); - - let client_operations = async move { - let client = client_future.await; + let mut seq = Sequence::new(); + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mut mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + const PATH: &str = "/1"; + let interface = Interface::from_str(crate::test::OBJECT_DEVICE_DATASTREAM).unwrap(); + let interface_name = interface.interface_name().to_owned(); + + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i| { + // no messages are read as responses by the server so we pass an empty vec + Ok(tonic::Response::new(mock_stream([]))) + }); + + let interface_name_cl = interface_name.clone(); + mock_client_tx + .expect_send::>() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(move |r: &Request| { + DeviceEvent::try_from(r.get_ref().clone()).is_ok_and(|e| { + e.interface == interface_name_cl + && e.path == PATH + && matches!(e.data, Value::Object(o) + if MockDeviceObject::mock_object() == o) + }) + })) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); - let interface = Interface::from_str(crate::test::OBJECT_DEVICE_DATASTREAM).unwrap(); - let path = MappingPath::try_from("/1").unwrap(); - let interfaces = Interfaces::from_iter([interface.clone()]); + mock_client_tx + .expect_detach::>() + .times(1) + .in_sequence(&mut seq) + .returning(|_i: Request<_>| Ok(tonic::Response::new(pbjson_types::Empty {}))); - let (mut client, _connection) = - mock_astarte_grpc_client(client, &interfaces, MemoryStore::new()) - .await - .unwrap(); + let (mut client, _connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); - let validated_object = mock_validate_object( - &interface, - &path, - mock_astarte_object(), - Some(chrono::offset::Utc::now()), - ) + let path = MappingPath::try_from(PATH).unwrap(); + let interfaces = Interfaces::from_iter([interface]); + let object_ref = interfaces + .get(&interface_name) + .and_then(ObjectRef::new) .unwrap(); + let validated = + ValidatedObject::validate(object_ref, &path, MockDeviceObject::mock_object(), None) + .unwrap(); - client.send_object(validated_object).await.unwrap() - }; - - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } - - expect_messages!(channels.server_request_receiver.try_recv(); - ServerReceivedRequest::Attach((id, _)) if id == ID, - ServerReceivedRequest::Send(m) - => data_event = DeviceEvent::try_from(m).expect("Malformed message"); - if data_event.interface == "org.astarte-platform.rust.examples.object-datastream.DeviceDatastream" - && data_event.path == "/1", - => object_value = { let Value::Object(v) = data_event.data else { panic!("Expected object") }; v }; - if object_value.get("endpoint1") == Some(&AstarteType::Double(4.2)) - && object_value.get("endpoint2") == Some(&AstarteType::String("obj".to_string())) - && object_value.get("endpoint3") == Some(&AstarteType::BooleanArray(vec![true])) - ); + client.send_object(validated).await.unwrap(); + client.disconnect().await.unwrap(); } #[tokio::test] async fn test_connection_receive_object() { - let (server_impl, channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - let expected_object = Value::Object(mock_astarte_object()); - + let mut seq = Sequence::new(); + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + const PATH: &str = "/1"; + let interface = Interface::from_str(crate::test::E2E_SERVER_DATASTREAM).unwrap(); + let interface_name = interface.interface_name().to_owned(); + let expected_object = Value::Object(MockServerObject::mock_object()); let proto_payload: astarte_message_hub_proto::astarte_message::Payload = expected_object.into(); + let astarte_message = super::convert::test::new_astarte_message( + interface_name.clone(), + PATH.to_string(), + None, + proto_payload.clone(), + ); - let astarte_message = AstarteMessage { - interface_name: "org.astarte-platform.rust.examples.object-datastream.DeviceDatastream" - .to_string(), - path: "/1".to_string(), - timestamp: None, - payload: Some(proto_payload.clone()), - }; - - // Send object from server - channels - .server_response_sender - .send(vec![Ok(astarte_message.into())]) - .await - .unwrap(); - - let interfaces = - Interfaces::from_iter([ - Interface::from_str(crate::test::OBJECT_DEVICE_DATASTREAM).unwrap() - ]); - - let client_connection = async { - let client = client_future.await; - - mock_astarte_grpc_client(client, &interfaces, MemoryStore::new()).await - }; - - let (_client, mut connection) = tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - res = client_connection => { - println!("Client connected correctly: {}", res.is_ok()); + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(move |_i| { + Ok(tonic::Response::new(mock_stream([Ok(Some( + astarte_message_hub_proto::MessageHubEvent { + event: Some( + astarte_message_hub_proto::message_hub_event::Event::Message( + astarte_message.clone(), + ), + ), + }, + ))]))) + }); + + let (_client, mut connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); - res.expect("Expected correct connection in test") - }, + let Some(event) = connection.next_event().await.unwrap() else { + panic!("Event received did not match the pattern"); }; - expect_messages!(connection.next_event().await; - Some(ReceivedEvent { - ref interface, - ref path, - payload: GrpcPayload { - data, - timestamp: None, - }, - }) if interface == "org.astarte-platform.rust.examples.object-datastream.DeviceDatastream" - && path == "/1" - && data == proto_payload - ); + assert_eq!(event.interface, interface_name); + assert_eq!(event.path, PATH); + assert_eq!(event.payload.data, proto_payload); } #[tokio::test] async fn test_connection_receive_unset() { - let (server_impl, channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - let exp_interface = - "org.astarte-platform.rust.examples.individual-properties.ServerProperties"; - let exp_path = "/1/enable"; + let mut seq = Sequence::new(); + let mock_store_client = MsgHubClient::new(); + let store = GrpcStore::new(mock_store_client); + let mock_client_tx = MsgHubClient::new(); + let mut mock_client_rx = MsgHubClient::new(); + + const PATH: &str = "/1/enable"; + let interface = Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap(); + let interface_name = interface.interface_name().to_owned(); let proto_payload: ProtoPayload = Value::Unset.into(); let astarte_message = super::convert::test::new_astarte_message( - exp_interface.to_string(), - exp_path.to_string(), + interface_name.clone(), + PATH.to_string(), None, proto_payload.clone(), ); - // Send object from server - channels - .server_response_sender - .send(vec![Ok(astarte_message.into())]) - .await - .unwrap(); - - let interfaces = - Interfaces::from_iter([Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap()]); - - let client_connection = async { - let client = client_future.await; - - mock_astarte_grpc_client(client, &interfaces, MemoryStore::new()).await - }; - - let (_client, mut connection) = tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - res = client_connection => { - println!("Client connected correctly: {}", res.is_ok()); + mock_client_rx + .expect_attach::>() + .times(1) + .in_sequence(&mut seq) + .returning(move |_i| { + Ok(tonic::Response::new(mock_stream([Ok(Some( + astarte_message_hub_proto::MessageHubEvent { + event: Some( + astarte_message_hub_proto::message_hub_event::Event::Message( + astarte_message.clone(), + ), + ), + }, + ))]))) + }); + + let (_client, mut connection) = + mock_grpc(mock_client_tx, mock_client_rx, &Interfaces::new(), store) + .await + .unwrap(); - res.expect("Expected correct connection in test") - }, + let Some(event) = connection.next_event().await.unwrap() else { + panic!("Event received did not match the pattern"); }; - assert!( - matches!(connection.next_event().await, Ok(Some(ReceivedEvent { - ref interface, - ref path, - payload: GrpcPayload { - data, - timestamp: None, - }, - })) if interface == exp_interface - && path == exp_path - && data == proto_payload) - ) + assert_eq!(event.interface, interface_name); + assert_eq!(event.path, PATH); + assert_eq!(event.payload.data, proto_payload); } #[test] diff --git a/src/transport/grpc/store.rs b/src/transport/grpc/store.rs index f2028e6f7..9c2f1831b 100644 --- a/src/transport/grpc/store.rs +++ b/src/transport/grpc/store.rs @@ -190,133 +190,159 @@ impl PropertyStore for GrpcStore { #[cfg(test)] mod test { + use std::collections::HashMap; use std::str::FromStr; - use astarte_message_hub_proto::pbjson_types::Empty; - use astarte_message_hub_proto::InterfacesName; + use astarte_message_hub_proto::tonic; use astarte_message_hub_proto::PropertyIdentifier; use astarte_message_hub_proto::StoredPropertiesFilter; + use astarte_message_hub_proto_mock::mockall::{predicate, Sequence}; + use super::super::test::InterfaceRequestUtils; use super::GrpcStore; + use super::MsgHubClient; use super::PropertyStore; use crate::interface::Ownership; use crate::store::PropertyMapping; use crate::store::StoredProp; - use crate::transport::grpc::test::build_test_message_hub_server; - use crate::transport::grpc::test::expect_messages; - use crate::transport::grpc::test::ServerReceivedRequest; - use crate::transport::grpc::test::ID; - use crate::transport::Disconnect; - use crate::transport::{ - grpc::test::{mock_astarte_grpc_client, mock_grpc_actors}, - Interfaces, - }; + use crate::transport::grpc::convert::test::new_property; + use crate::AstarteType; use crate::Interface; #[tokio::test] async fn test_grpc_store_grpc_client_calls() { - let (server_impl, mut channels) = build_test_message_hub_server(); - let (server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - channels.server_response_sender.send(vec![]).await.unwrap(); - - let client_operations = async move { - let client = client_future.await; - let grpc_store = GrpcStore::new(client.clone()); - // When the grpc connection gets created the attach methods is called - let (mut client, _connection) = - mock_astarte_grpc_client(client.clone(), &Interfaces::new(), grpc_store.clone()) - .await - .unwrap(); - - let _device_properties = grpc_store.device_props().await.unwrap(); - let _server_properties = grpc_store.server_props().await.unwrap(); - - let device_interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); - let _device_interface_properties = grpc_store - .interface_props(&(&device_interface).into()) - .await - .unwrap(); - - let server_interface = Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap(); - let _server_interface_properties = grpc_store - .interface_props(&(&server_interface).into()) - .await - .unwrap(); - - let device_path = "/path1"; - let device_prop_info = - PropertyMapping::new_unchecked((&device_interface).into(), device_path); - let _device_prop = grpc_store.load_prop(&device_prop_info, 1).await; - // no request should be made - let server_path = "/path1"; - let server_prop_info = - PropertyMapping::new_unchecked((&server_interface).into(), server_path); - let _server_prop = grpc_store.load_prop(&server_prop_info, 1).await; - - // manually calling detach - client.disconnect().await.unwrap(); - }; + let device_interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); + let server_interface = Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap(); + const PATH: &str = "/path1"; + let mut seq = Sequence::new(); + let mut mock_store_client = MsgHubClient::new(); + // device + mock_store_client + .expect_get_property::() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|i: &PropertyIdentifier| { + i.interface_name + == Interface::from_str(crate::test::DEVICE_PROPERTIES) + .unwrap() + .interface_name() + && i.path == PATH + })) + .returning(|_i| Ok(tonic::Response::new(new_property(PATH.to_string(), None)))); + // server + mock_store_client + .expect_get_property::() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &PropertyIdentifier| { + r.interface_name + == Interface::from_str(crate::test::SERVER_PROPERTIES) + .unwrap() + .interface_name() + && r.path == PATH + })) + .returning(|_i| Ok(tonic::Response::new(new_property(PATH.to_string(), None)))); + // device + mock_store_client + .expect_get_all_properties::() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &StoredPropertiesFilter| { + r.ownership == Some(astarte_message_hub_proto::Ownership::Device as i32) + })) + .returning(|_i| { + Ok(tonic::Response::new( + astarte_message_hub_proto::StoredProperties { + interface_properties: HashMap::new(), + }, + )) + }); + // server + mock_store_client + .expect_get_all_properties::() + .times(1) + .in_sequence(&mut seq) + .with(predicate::function(|r: &StoredPropertiesFilter| { + r.ownership == Some(astarte_message_hub_proto::Ownership::Server as i32) + })) + .returning(|_i| { + Ok(tonic::Response::new( + astarte_message_hub_proto::StoredProperties { + interface_properties: HashMap::new(), + }, + )) + }); + // device + mock_store_client + .expect_get_properties::() + .times(1) + .in_sequence(&mut seq) + .withf(move |r| { + r.match_interfaces(&[Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap()]) + .unwrap() + }) + .returning(|_i| { + Ok(tonic::Response::new( + astarte_message_hub_proto::StoredProperties { + interface_properties: HashMap::new(), + }, + )) + }); + // server + mock_store_client + .expect_get_properties::() + .times(1) + .in_sequence(&mut seq) + .withf(move |r| { + r.match_interfaces(&[Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap()]) + .unwrap() + }) + .returning(|_i| { + Ok(tonic::Response::new( + astarte_message_hub_proto::StoredProperties { + interface_properties: HashMap::new(), + }, + )) + }); - tokio::select! { - _ = server_future => panic!("The server closed before the client could complete sending the data"), - _ = client_operations => println!("Client sent its data"), - } + let grpc_store = GrpcStore::new(mock_store_client); + + let device_prop_info = PropertyMapping::new_unchecked((&device_interface).into(), PATH); + // the server should be called + let _device_prop = grpc_store.load_prop(&device_prop_info, 1).await; + + let server_prop_info = PropertyMapping::new_unchecked((&server_interface).into(), PATH); + // the server should be called + let _server_prop = grpc_store.load_prop(&server_prop_info, 1).await; + + // the server should be called + let _device_properties = grpc_store.device_props().await.unwrap(); + // the server should be called + let _server_properties = grpc_store.server_props().await.unwrap(); let device_interface = Interface::from_str(crate::test::DEVICE_PROPERTIES).unwrap(); + // the server should be called + let _device_interface_properties = grpc_store + .interface_props(&(&device_interface).into()) + .await + .unwrap(); + let server_interface = Interface::from_str(crate::test::SERVER_PROPERTIES).unwrap(); - expect_messages!(channels.server_request_receiver.try_recv(); - // connection creation attach - ServerReceivedRequest::Attach((id, _)) if id == ID, - // load device properties should call the message hub - ServerReceivedRequest::GetAllProperties(StoredPropertiesFilter { - ownership: Some(ownership), - }) if ownership == astarte_message_hub_proto::Ownership::Device as i32, - // load server properties should call the message hub - ServerReceivedRequest::GetAllProperties(StoredPropertiesFilter { - ownership: Some(ownership), - }) if ownership == astarte_message_hub_proto::Ownership::Server as i32, - // if a property interface is device owned the message hub should be called - ServerReceivedRequest::GetProperties(InterfacesName { - names, - }) if names.len() == 1 && names.first().unwrap() == device_interface.interface_name(), - // if a property interface is server owned the message hub should also be called - ServerReceivedRequest::GetProperties(InterfacesName { - names, - }) if names.len() == 1 && names.first().unwrap() == server_interface.interface_name(), - // loading a device property from the message hub - ServerReceivedRequest::GetProperty(PropertyIdentifier { - interface_name, - path, - }) if interface_name == device_interface.interface_name() && path == "/path1", - // loading a server property from the message hub - ServerReceivedRequest::GetProperty(PropertyIdentifier { - interface_name, - path, - }) if interface_name == server_interface.interface_name() && path == "/path1", - // detach - ServerReceivedRequest::Detach(Empty {}), - ); + // the server should be called + let _server_interface_properties = grpc_store + .interface_props(&(&server_interface).into()) + .await + .unwrap(); } #[tokio::test] async fn test_grpc_store_device_prop_not_stored() { - let (server_impl, mut _channels) = build_test_message_hub_server(); - let (_server_future, client_future) = mock_grpc_actors(server_impl) - .await - .expect("Could not construct test client and server"); - - let client = client_future.await; - let grpc_store = GrpcStore::new(client); - - let inner_value = crate::AstarteType::Integer(1); - let path = "/path1"; + let inner_value = AstarteType::Integer(1); + const PATH: &str = "/path1"; let server_interface = "com.server.interface"; let server_prop = StoredProp { interface: server_interface, - path, + path: PATH, value: &inner_value, interface_major: 1, ownership: Ownership::Server, @@ -325,20 +351,39 @@ mod test { let device_interface = "com.device.interface"; let device_prop = StoredProp { interface: device_interface, - path, + path: PATH, value: &inner_value, interface_major: 1, ownership: Ownership::Device, }; let device_interface_data = &(&device_prop).into(); + let mock_store_client = MsgHubClient::new(); + let grpc_store = GrpcStore::new(mock_store_client); + + // we do not store anything locally so no action should be performed + // no actions or calls to the server should be performed grpc_store.store_prop(server_prop).await.unwrap(); // no actions or calls to the server should be performed grpc_store.store_prop(device_prop).await.unwrap(); // no actions or calls to the server should be performed + grpc_store.unset_prop(server_interface_data).await.unwrap(); + // no actions or calls to the server should be performed + grpc_store.unset_prop(device_interface_data).await.unwrap(); + // no actions or calls to the server should be performed grpc_store.delete_prop(server_interface_data).await.unwrap(); // no actions or calls to the server should be performed grpc_store.delete_prop(device_interface_data).await.unwrap(); + // no actions or calls to the server should be performed + grpc_store + .delete_interface(server_interface_data) + .await + .unwrap(); + // no actions or calls to the server should be performed + grpc_store + .delete_interface(device_interface_data) + .await + .unwrap(); } }