diff --git a/Cargo.lock b/Cargo.lock index 2c1035a9f..f32ca0be5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,56 +66,12 @@ dependencies = [ "libc", ] -[[package]] -name = "anstream" -version = "0.6.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - [[package]] name = "anstyle" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" -[[package]] -name = "anstyle-parse" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" -dependencies = [ - "windows-sys 0.59.0", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" -dependencies = [ - "anstyle", - "once_cell", - "windows-sys 0.59.0", -] - [[package]] name = "anyhow" version = "1.0.97" @@ -185,7 +141,7 @@ dependencies = [ "cfg-if", "chrono", "color-eyre", - "env_logger", + "eyre", "flate2", "flume", "futures", @@ -570,12 +526,6 @@ dependencies = [ "tracing-error", ] -[[package]] -name = "colorchoice" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" - [[package]] name = "colored" version = "3.0.0" @@ -711,29 +661,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "env_filter" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1351,12 +1278,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" -[[package]] -name = "is_terminal_polyfill" -version = "1.70.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" - [[package]] name = "itertools" version = "0.11.0" @@ -1381,30 +1302,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "jiff" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde", -] - -[[package]] -name = "jiff-static" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "jobserver" version = "0.1.32" @@ -1870,21 +1767,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "portable-atomic" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" - -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -2969,12 +2851,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "utf8parse" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" - [[package]] name = "uuid" version = "1.16.0" diff --git a/Cargo.toml b/Cargo.toml index 265b93288..75b461ba0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ astarte-message-hub-proto = { workspace = true } astarte-message-hub-proto-mock = { workspace = true } async-trait = { workspace = true } color-eyre = { workspace = true } -env_logger = { workspace = true } +eyre = { workspace = true } mockall = { workspace = true } mockito = { workspace = true } pretty_assertions = { workspace = true } @@ -143,7 +143,6 @@ bytes = "1.5.0" cfg-if = "1.0.0" chrono = "0.4.20" color-eyre = "0.6.3" -env_logger = "0.11.0" eyre = "0.6.12" flate2 = "1.0.0" flume = "0.11.0" @@ -182,3 +181,13 @@ webpki-roots = "0.26.0" # Transitive dependencies litemap = "=0.7.4" zerofrom = "=0.1.5" + +[[example]] +name = "msghub_client" +path = "examples/message_hub_client/main.rs" +required-features = ["message-hub", "derive"] + +[[example]] +name = "object_datastream" +path = "examples/object_datastream/main.rs" +required-features = ["derive"] diff --git a/README.md b/README.md index 4629a74d2..21e7f1595 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,12 @@ Quick links: - [API documentation](https://docs.rs/astarte-device-sdk/latest/astarte_device_sdk/). - [Astarte documentation](https://docs.astarte-platform.org/latest/001-intro_user.html) for more information regarding Astarte. - [Get started](https://docs.rs/astarte-device-sdk/latest/astarte_device_sdk/_docs/_get_started/index.html) for a guide on how to use the SDK functionalities. +- [Connect to the Astarte MessageHub](https://docs.rs/astarte-device-sdk/latest/astarte_device_sdk/_docs/_connect_to_the_astarte_msghub/index.html) tutorial. - [OS requirements](https://github.com/astarte-platform/astarte-device-sdk-rust/tree/master/docs/os-requirements.md) for system libraries. ## Use the library -You can add the library tro your project with: +You can add the library to your project with: ```sh cargo new astarte-project && cd astarte-project diff --git a/docs/connect-to-the-astarte-message-hub.md b/docs/connect-to-the-astarte-message-hub.md new file mode 100644 index 000000000..01675d878 --- /dev/null +++ b/docs/connect-to-the-astarte-message-hub.md @@ -0,0 +1,514 @@ + + +# Connect to the Astarte MessageHub + +The Astarte Device SDK supports two different connection types: + +- **Astarte MQTT Protocol**: this is a direct connection from one device to Astarte +- **Message Hub**: multiple applications or sensors connect to the Hub that appears as a single + device to Astarte + +Here we will go through, step by step, on how to connect and send data to the Message Hub. + +> You can find the full code by going to the +> [message hub client example](https://github.com/astarte-platform/astarte-device-sdk-rust/tree/master/examples/message_hub_client) + +## Before you begin + +There are a few setup steps and requirements that are needed before start working on the example. + +First, you should be already familiar on how Astarte and the Device SDK works, so you should pause +here and follow the +[Get Started](https://docs.rs/astarte-device-sdk/latest/astarte_device_sdk/_docs/_get_started/index.html) +tutorial if you haven't already. + +### Local Astarte instance + +To get started you'll need to setup a local Astarte instance that the message hub is going to +connect to. You can follow the +[Astarte quick instance guide](https://docs.astarte-platform.org/device-sdks/common/astarte_quick_instance.html). +If you follow the guide you'll have an Astarte instance, that must be accessible from the Message +Hub server, and all the keys necessary to register the Message Hub server as a new device. + +### Message Hub Server + +To register the server you can follow the guide and examples in +[the Astarte MessageHub repository](https://github.com/astarte-platform/astarte-message-hub). +Following this, we assume the Server is listening on the same machine IP v4 address `127.0.0.1` and +port `50051`. + +#### Client Auth + +At the moment the client doesn't require to Authenticate with the MessageHub, since we are assuming +they are on the same and secure LAN network. The only client's requirement for connecting to the +MessageHub is a unique `UUID`. + +## System dependencies + +The following dependencies are required to run the examples. + +### Rust + +We suggest using and up-to-date Rust toolchain, preferably the current stable version. Our +recommended way to install to install is through [rustup](https://rustup.rs/). If you prefer using +your system provided toolchain, make sure it's supported by the SDK Minimum Supported Rust Version +([MSRV](https://doc.rust-lang.org/cargo/reference/rust-version.html)). + +### SQLite + +> Currently we don't store the property in the SQLite store because the feature is not widely +> supported by other SDKs. This also implies messages with retention volatile ore stored are not +> saved if disconnected. +> +> Although they are not used, the SQLite libraries are still required to compile the application. + +The device SDK uses [SQLite](https://www.sqlite.org/) as an in-process database to store Astarte +properties on disk. In order to compile the application, you need to provide a compatible `sqlite3` +library. + +To use your system SQLite library, you need to have a C toolchain, `libsqlite3` and `pkg-config` +installed. This way you can link it with your Rust executable. For example, on a Debian/Ubuntu +system you install them through `apt`: + +```sh +apt install build-essential pkg-config libsqlite3-dev +``` + +You can find more information on the [rusqlite GitHub page](https://github.com/rusqlite/rusqlite). + +## Creating the project and adding the dependencies + +Fist of all, we will create a new binary Rust project: + +```bash +cargo new astarte-message-hub-client +cd astarte-message-hub-client +``` + +Then, we need to add the following dependencies to the project: + +- `astarte-device-sdk`: the client library to connect to the MessageHub +- `tokio` async runtime to handle the connection +- `Uuid` library to handle and generate UUIDs. + +We also suggest you to add the following dependencies + +- `tracing` and `tracing_subscriber` for creating and consuming log events +- `eyre` and `color-eyre` to handle the Errors and convert them in a human readable report + +You can run the following cargo add command: + +```sh +cargo add astarte-device-sdk --features=derive,message-hub +cargo add tokio --features=full +cargo add tracing tracing-subscriber +cargo add eyre color-eyre +``` + +The features of the `astarte-device-sdk` used in this guide will be: + +- `message-hub` will enable the connection to the MessageHub +- `derive` to derive the conversion of a Rust struct into a MessageHub message + +## Creating a Client and Connection + +Now that all the dependencies are installed, you need to create a +[`DeviceClient`](crate::client::DeviceClient) to send and receive events from the MessageHub and a +[`DeviceConnection`](crate::connection::DeviceConnection) to handle the connection/reconnection +events and lifetime. + +You will use the [`DeviceBuilder`](crate::builder::DeviceBuilder) and +[`GrpcConfig`](crate::transport::grpc::GrpcConfig) to set all the connection parameters used to talk +to the MessageHub. The parameter to configure is the gRPC +[Endpoint](crate::transport::grpc::tonic::transport::Endpoint) where the MessageHub is listening on +(`https://127.0.0.1:50051`). + + + +On the [`DeviceConnection`](crate::connection::DeviceConnection) you need call the +[`handle_events`](crate::connection::DeviceConnection), which is a blocking method that will handle +all the connection events. So, to not block the main task, we will use the +[`JoinSet`](tokio::task::JoinSet) to spawn multiple tasks and join each one at the end of the +program. + +```no_run +use astarte_device_sdk::{ + builder::DeviceBuilder, + prelude::*, + transport::grpc::{tonic::transport::Endpoint, Grpc, GrpcConfig, store::GrpcStore}, + DeviceClient, DeviceConnection, +}; +use tokio::task::JoinSet; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use uuid::Uuid; + +/// Unique ID for the current application to identify to the message hub with. +const NODE_UUID: Uuid = uuid::uuid!("0444d8c3-f3f1-4b89-9e68-6ffb50ec1839"); +/// URL the MessageHub is listening on +const MESSAGE_HUB_URL: &str = "http://127.0.0.1:50051"; +/// Writable directory to store the persistent state of the device +const STORE_DIRECTORY: &str = "./store-dir"; + +async fn init() -> eyre::Result<( + DeviceClient, + DeviceConnection>, +)> { + tokio::fs::create_dir_all(&STORE_DIRECTORY).await?; + + let endpoint = Endpoint::from_static(&MESSAGE_HUB_URL); + let grpc_config = GrpcConfig::new(NODE_UUID, endpoint); + + let (client, connection) = DeviceBuilder::new() + .store_dir(STORE_DIRECTORY) + .await? + .connection(grpc_config) + .build() + .await?; + + Ok((client, connection)) +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; + + let (_client, connection) = init().await?; + + info!("connected to the MessageHub"); + + let mut tasks = JoinSet::>::new(); + + // task to poll updates from the connection + tasks.spawn(async move { + connection.handle_events().await?; + + Ok(()) + }); + + // Cleanly close all the other tasks + tasks.spawn(async move { + tokio::signal::ctrl_c().await?; + + info!("SIGINT received, exiting"); + + Ok(()) + }); + + // handle tasks termination + while let Some(res) = tasks.join_next().await { + match res { + Ok(Ok(())) => { + // Close all the other tasks + tasks.abort_all(); + } + Ok(Err(err)) => { + error!("task returned an error"); + + return Err(err); + } + Err(err) if err.is_cancelled() => {} + Err(err) => { + error!("task panicked"); + + return Err(err.into()); + } + } + } + + info!("device disconnected"); + + Ok(()) +} +``` + +You can run the application with `cargo run` and see, in the MessageHub logs if the device attached +successfully. + +## Sending the introspection + +To communicate with Astarte we need to use the Interfaces which define the shape of our data. The +MessageHub doesn't know which Interfaces the applications are going to use. Each application will +register it's own interfaces in Astarte and send them to the MessageHub when connecting. + +### Registering the interfaces + +In the documentation we +[provided an interface of each type](https://github.com/astarte-platform/astarte-device-sdk-rust/tree/master/docs/interfaces) +to use to send and receive data between the Node and the MessageHub. + +You can use the following command to download the interfaces from the repo and sync them with +Astarte. + +```sh +# Inside the root of the project +mkdir interfaces +# Download the interfaces +curl --output-dir interfaces --fail --remote-name-all \ + 'https://raw.githubusercontent.com/astarte-platform/astarte-device-sdk-rust/refs/heads/master/docs/interfaces/org.astarte-platform.rust.get-started.IndividualDevice.json' \ + 'https://raw.githubusercontent.com/astarte-platform/astarte-device-sdk-rust/refs/heads/master/docs/interfaces/org.astarte-platform.rust.get-started.IndividualServer.json' \ + 'https://raw.githubusercontent.com/astarte-platform/astarte-device-sdk-rust/refs/heads/master/docs/interfaces/org.astarte-platform.rust.get-started.Aggregated.json' \ + 'https://raw.githubusercontent.com/astarte-platform/astarte-device-sdk-rust/refs/heads/master/docs/interfaces/org.astarte-platform.rust.get-started.Property.json' +# Sync them with Astarte +astartectl realm-management interfaces sync \ + --astarte-url http://astarte.localhost \ + --realm-name test \ + --realm-key test_private.pem \ + interfaces/*.json +``` + +Now all of the above interfaces must be visible in the Astarte dashboard. + +### Sending the introspection + +To send the Interfaces to the MessageHub and update the device introspection you can call the +[`interface_str`](crate::builder::DeviceBuilder::interface_str) method on the builder in the `init` +function that we declared previously. + +```no_run +# use astarte_device_sdk::{ +# builder::DeviceBuilder, +# prelude::*, +# transport::grpc::{tonic::transport::Endpoint, Grpc, GrpcConfig, store::GrpcStore}, +# DeviceClient, DeviceConnection, +# }; +# +# const NODE_UUID: uuid::Uuid = uuid::uuid!("0444d8c3-f3f1-4b89-9e68-6ffb50ec1839"); +# const MESSAGE_HUB_URL: &str = "http://127.0.0.1:50051"; +# const STORE_DIRECTORY: &str = "./store-dir"; +# +// NOTE: set the interface directory in your project, these are relative to this file +const AGGREGATED_DEVICE: &str = include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.Aggregated.json"); +const INDIVIDUAL_DEVICE: &str = include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.IndividualDevice.json"); +const INDIVIDUAL_SERVER: &str = include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.IndividualServer.json"); +const PROPERTY_DEVICE: &str = include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.Property.json"); + +async fn init() -> eyre::Result<( + DeviceClient, + DeviceConnection>, +)> { + tokio::fs::create_dir_all(&STORE_DIRECTORY).await?; + + let endpoint = Endpoint::from_static(&MESSAGE_HUB_URL); + let grpc_config = GrpcConfig::new(NODE_UUID, endpoint); + + let (client, connection) = DeviceBuilder::new() + .store_dir(STORE_DIRECTORY) + .await? + .interface_str(AGGREGATED_DEVICE)? + .interface_str(INDIVIDUAL_DEVICE)? + .interface_str(INDIVIDUAL_SERVER)? + .interface_str(PROPERTY_DEVICE)? + .connection(grpc_config) + .build() + .await?; + + Ok((client, connection)) +} +``` + +Now if you try to `cargo run` again you will see the node attaching to the MessageHub, and all the +interfaces will appear in the Device introspection on the Astarte Dashboard. + +## Receiving device events + +We can now spawn a task to receive data from Astarte. Using the +[`FromEvent`](crate::event::FromEvent) + +```no_run +# use astarte_device_sdk::{ +# builder::DeviceBuilder, +# prelude::*, +# transport::grpc::{tonic::transport::Endpoint, Grpc, GrpcConfig, store::GrpcStore}, +# DeviceClient, DeviceConnection, +# }; +# #[cfg(not(feature = "derive"))] +# use astarte_device_sdk_derive::FromEvent; +# #[cfg(feature = "derive")] +use astarte_device_sdk::FromEvent; +use astarte_device_sdk::client::RecvError; +use eyre::OptionExt; +use tracing::{info, error, warn}; + +/// Used to receive the IndividualDevice data. +/// +/// This need to be an enum because we deserialize each endpoint in it's own variables +#[derive(Debug, FromEvent)] +#[from_event( + interface = "org.astarte-platform.rust.get-started.IndividualServer", + aggregation = "individual" +)] +enum ServerIndividual { + #[mapping(endpoint = "/%{id}/data")] + Boolean(bool), +} + +async fn receive_data(client: DeviceClient) -> eyre::Result<()> { + loop { + let event = match client.recv().await { + Ok(event) => event, + Err(RecvError::Disconnected) => { + info!("client disconnected"); + return Ok(()); + } + Err(err) => { + error!(error = %eyre::Report::new(err), "received error from client"); + continue; + } + }; + + match event.interface.as_str() { + "org.astarte-platform.rust.get-started.IndividualServer" => { + // parse the path to extract the id part + // e.g. '/42/data' will strip the '/' and '/data' to return '42' + let id = event + .path + .strip_prefix("/") + .and_then(|s| s.strip_suffix("/data")) + .ok_or_eyre("couldn't get endpoint id parameter")? + .to_string(); + + let ServerIndividual::Boolean(event) = ServerIndividual::from_event(event)?; + + info!(event, id, "received new datastream on IndividualServer/"); + } + interface => { + warn!(interface, "unhandled interface event received"); + + continue; + } + } + } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { +# let mut tasks = tokio::task::JoinSet::new(); +# fn client() -> DeviceClient { todo!() } +# let client = client(); + // ... + + // receive events from the MessageHub + tasks.spawn(receive_data(client.clone())); + + // ... +# Ok(()) +} +``` + +You can send data from Astarte on a server-owned interface by using the +`astartectl appengine device publish-datastream` command and the Device Id of the message-hub (e.g. +`2TBn-jNESuuHamE2Zo1anA`), you need to specify the **interface name** +(`org.astarte-platform.rust.get-started.IndividualServer`) you want to publish on and the +**endpoint** (`/42/data` the `41` is in the place of the parameter `/%{id}/data`). Since in this +example the endpoint is of type double, we will send `3.14` as a value: + +```sh +astartectl appengine \ + --appengine-url 'http://astarte.localhost' \ + --realm-key 'test_private.pem' \ + --realm-name 'test' \ + devices publish-datastream '2TBn-jNESuuHamE2Zo1anA' 'org.astarte-platform.rust.get-started.IndividualServer' '/42/data' '3.14' +``` + +The MessageHub will then relay this event to all the Nodes having the given interface in their +introspection. + +## Sending data + +Finally, we can send data to the MessageHub. We implement a task similar to the receive one, in a +loop every 2 seconds we send the data to all the interfaces. The property one will set and save the +value, and will only send it once to the Server since it doesn't change. While for the `Aggregated` +interface we create a struct and derive the [`IntoAstarteObject`](crate::IntoAstarteObject) that will +convert the Rust struct in an Object Aggregate to send. + +```no_run +# use astarte_device_sdk::{ +# builder::DeviceBuilder, +# prelude::*, +# transport::grpc::{tonic::transport::Endpoint, Grpc, GrpcConfig, store::GrpcStore}, +# DeviceClient, DeviceConnection, +# }; +# +use std::time::Duration; + +# #[cfg(feature = "derive")] +use astarte_device_sdk::IntoAstarteObject; +# #[cfg(not(feature = "derive"))] +# use astarte_device_sdk_derive::IntoAstarteObject; + +/// Aggregated object +#[derive(Debug, IntoAstarteObject)] +struct AggregatedDevice { + double_endpoint: f64, + string_endpoint: String, +} + +/// Send data after an interval to every interface +async fn send_data(client: DeviceClient) -> eyre::Result<()> { + // Every 2 seconds send the data + let mut interval = tokio::time::interval(Duration::from_secs(2)); + + loop { + // Publish on the IndividualDevice + client + .send( + "org.astarte-platform.rust.get-started.IndividualDevice", + "/double_endpoint", + 3.14, + ) + .await?; + // Publish on the Aggregaed + let obj_data = AggregatedDevice { + double_endpoint: 42.0, + string_endpoint: "Sensor 1".to_string(), + }; + client + .send_object( + "org.astarte-platform.rust.get-started.Aggregated", + "/group_data", + obj_data.try_into()?, + ) + .await?; + // Set the Property + client + .send( + "org.astarte-platform.rust.get-started.Property", + "/double_endpoint", + 42.0, + ) + .await?; + + interval.tick().await; + } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { +# let mut tasks = tokio::task::JoinSet::new(); +# fn client() -> DeviceClient { todo!() } +# let client = client(); + // ... + + // send events to the MessageHub + tasks.spawn(send_data(client)); + + // ... +# Ok(()) +} +``` diff --git a/examples/individual_datastream/main.rs b/examples/individual_datastream/main.rs index 1486e8fb1..180cf0b1d 100644 --- a/examples/individual_datastream/main.rs +++ b/examples/individual_datastream/main.rs @@ -20,6 +20,7 @@ use std::time::{Duration, SystemTime}; +use eyre::OptionExt; use serde::Deserialize; use astarte_device_sdk::{ @@ -28,8 +29,7 @@ use astarte_device_sdk::{ }; use tokio::task::JoinSet; use tracing::error; - -type DynError = Box; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Deserialize)] struct Config { @@ -40,14 +40,17 @@ struct Config { } #[tokio::main] -async fn main() -> Result<(), DynError> { - env_logger::init(); +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; + let now = SystemTime::now(); // Load configuration - let file = - std::fs::read_to_string("./examples/individual_datastream/configuration.json").unwrap(); - let cfg: Config = serde_json::from_str(&file).unwrap(); + let file = std::fs::read_to_string("./examples/individual_datastream/configuration.json")?; + let cfg: Config = serde_json::from_str(&file)?; let mut mqtt_config = MqttConfig::with_credential_secret( &cfg.realm, @@ -67,7 +70,7 @@ async fn main() -> Result<(), DynError> { let client_cl = client.clone(); println!("Connection to Astarte established."); - let mut tasks = JoinSet::>::new(); + let mut tasks = JoinSet::>::new(); // Create a task to transmit tasks.spawn(async move { @@ -113,7 +116,7 @@ async fn main() -> Result<(), DynError> { let led_id = iter .next() .and_then(|id| id.parse::().ok()) - .ok_or("Incorrect error received.")?; + .ok_or_eyre("Incorrect error received.")?; match iter.next() { Some("enable") => { @@ -124,7 +127,7 @@ async fn main() -> Result<(), DynError> { ); } Some("intensity") => { - let value: f64 = var.try_into().unwrap(); + let value: f64 = var.try_into()?; println!( "Received new intensity datastream for LED number {}. LED intensity is now {}", led_id, value diff --git a/examples/individual_properties/main.rs b/examples/individual_properties/main.rs index 052882382..879c3e187 100644 --- a/examples/individual_properties/main.rs +++ b/examples/individual_properties/main.rs @@ -18,17 +18,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -use std::error::Error as StdError; - +use eyre::OptionExt; use serde::{Deserialize, Serialize}; use astarte_device_sdk::{ builder::DeviceBuilder, client::RecvError, error::Error, prelude::*, store::SqliteStore, transport::mqtt::MqttConfig, Value, }; -use tracing::error; - -type DynError = Box; +use tokio::task::JoinSet; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Serialize, Deserialize, Debug)] struct Config { @@ -57,13 +56,15 @@ async fn get_name_for_sensor( } #[tokio::main] -async fn main() -> Result<(), DynError> { - env_logger::init(); +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; // Load configuration - let file = - std::fs::read_to_string("./examples/individual_properties/configuration.json").unwrap(); - let cfg: Config = serde_json::from_str(&file).unwrap(); + let file = std::fs::read_to_string("./examples/individual_properties/configuration.json")?; + let cfg: Config = serde_json::from_str(&file)?; // Open the database, create it if it does not exists let db = @@ -85,24 +86,29 @@ async fn main() -> Result<(), DynError> { .connection(mqtt_config) .build() .await?; - let device_cpy = client.clone(); + let client_cl = client.clone(); println!("Connection to Astarte established."); + let mut tasks = JoinSet::>::new(); + // Create an thread to transmit - tokio::task::spawn(async move { + tasks.spawn(async move { let mut i: u32 = 0; println!("Properties values at startup:"); // Check the value of the name property for sensors 1 - if let Ok(name) = get_name_for_sensor(&device_cpy, 1).await { - println!(" - Property \"name\" for sensor 1 has value: \"{name}\""); + if let Ok(name) = get_name_for_sensor(&client_cl, 1).await { + info!(" - Property \"name\" for sensor 1 has value: \"{name}\""); if name != *"None" { - i = name.strip_prefix("name number ").unwrap().parse().unwrap(); + i = name + .strip_prefix("name number ") + .ok_or_eyre("couldn't strip prefix")? + .parse()?; } } // Check the value of the name property for sensors 2 - if let Ok(name) = get_name_for_sensor(&device_cpy, 2).await { + if let Ok(name) = get_name_for_sensor(&client_cl, 2).await { println!(" - Property \"name\" for sensor 2 has value: \"{name}\""); } @@ -111,14 +117,14 @@ async fn main() -> Result<(), DynError> { // Send in a loop the change of the property "name" of sensor 1 loop { - device_cpy + client_cl .send( "org.astarte-platform.rust.examples.individual-properties.DeviceProperties", "/1/name", format!("name number {i}"), ) - .await - .unwrap(); + .await?; + println!("Sent property \"name\" for sensor 1 with new value \"name number {i}\""); i += 1; tokio::time::sleep(std::time::Duration::from_secs(5)).await; @@ -126,7 +132,7 @@ async fn main() -> Result<(), DynError> { }); // Use the current thread to receive changes in the server owned properties - tokio::spawn(async move { + tasks.spawn(async move { loop { match client.recv().await { Ok(data) => { @@ -135,7 +141,7 @@ async fn main() -> Result<(), DynError> { let sensor_id = iter .next() .and_then(|id| id.parse::().ok()) - .ok_or("Incorrect error received.")?; + .ok_or_eyre("Incorrect error received.")?; match iter.next() { Some("enable") => { @@ -146,7 +152,7 @@ async fn main() -> Result<(), DynError> { ); } Some("samplingPeriod") => { - let value: i32 = var.try_into().unwrap(); + let value: i32 = var.try_into()?; println!("Sampling period for sensor {} is {}", sensor_id, value); } _ => {} @@ -158,10 +164,26 @@ async fn main() -> Result<(), DynError> { } } - Ok::<_, DynError>(()) + Ok(()) + }); + + tasks.spawn(async move { + connection.handle_events().await?; + + Ok(()) }); - connection.handle_events().await?; + while let Some(res) = tasks.join_next().await { + match res { + Ok(res) => { + res?; + + tasks.abort_all(); + } + Err(err) if err.is_cancelled() => {} + Err(err) => return Err(err.into()), + } + } Ok(()) } diff --git a/examples/message_hub_client/README.md b/examples/message_hub_client/README.md new file mode 100644 index 000000000..93e781d11 --- /dev/null +++ b/examples/message_hub_client/README.md @@ -0,0 +1,25 @@ + + +# Message Hub Client + +To follow this example read the step by step guide at +[Connect to the Astarte MessageHub](https://docs.rs/astarte-device-sdk/latest/astarte_device_sdk/_docs/_connect_to_the_astarte_msghub/index.html). diff --git a/examples/message_hub_client/main.rs b/examples/message_hub_client/main.rs new file mode 100644 index 000000000..dbbd4cf87 --- /dev/null +++ b/examples/message_hub_client/main.rs @@ -0,0 +1,233 @@ +// This file is part of Astarte. +// +// Copyright 2025 SECO Mind Srl +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Example on connecting to the Astarte MessageHub. + +use std::{f64, time::Duration}; + +use astarte_device_sdk::{ + builder::DeviceBuilder, + client::RecvError, + prelude::*, + transport::grpc::{store::GrpcStore, tonic::transport::Endpoint, Grpc, GrpcConfig}, + DeviceClient, DeviceConnection, +}; +use eyre::OptionExt; +use tokio::task::JoinSet; +use tracing::{error, info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use uuid::Uuid; + +/// Unique ID for the current application to identify to the message hub with. +const NODE_UUID: Uuid = uuid::uuid!("0444d8c3-f3f1-4b89-9e68-6ffb50ec1839"); +/// URL the MessageHub is listening on +const MESSAGE_HUB_URL: &str = "http://127.0.0.1:50051"; +/// Writable directory to store the persistent state of the device +const STORE_DIRECTORY: &str = "./store-dir"; + +const AGGREGATED_DEVICE: &str = + include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.Aggregated.json"); +const INDIVIDUAL_DEVICE: &str = include_str!( + "../../docs/interfaces/org.astarte-platform.rust.get-started.IndividualDevice.json" +); +const INDIVIDUAL_SERVER: &str = include_str!( + "../../docs/interfaces/org.astarte-platform.rust.get-started.IndividualServer.json" +); +const PROPERTY_DEVICE: &str = + include_str!("../../docs/interfaces/org.astarte-platform.rust.get-started.Property.json"); + +/// Used to receive the IndividualDevice data. +/// +/// This need to be an enum because we deserialize each endpoint in it's own variables +#[derive(Debug, FromEvent)] +#[from_event( + interface = "org.astarte-platform.rust.get-started.IndividualServer", + aggregation = "individual" +)] +enum ServerIndividual { + #[mapping(endpoint = "/%{id}/data")] + Double(f64), +} + +async fn init() -> eyre::Result<( + DeviceClient, + DeviceConnection>, +)> { + tokio::fs::create_dir_all(&STORE_DIRECTORY).await?; + + let endpoint = Endpoint::from_static(MESSAGE_HUB_URL); + let grpc_config = GrpcConfig::new(NODE_UUID, endpoint); + + let (client, connection) = DeviceBuilder::new() + .store_dir(STORE_DIRECTORY) + .await? + .interface_str(AGGREGATED_DEVICE)? + .interface_str(INDIVIDUAL_DEVICE)? + .interface_str(INDIVIDUAL_SERVER)? + .interface_str(PROPERTY_DEVICE)? + .connection(grpc_config) + .build() + .await?; + + Ok((client, connection)) +} + +async fn receive_data(client: DeviceClient) -> eyre::Result<()> { + loop { + let event = match client.recv().await { + Ok(event) => event, + Err(RecvError::Disconnected) => { + info!("client disconnected"); + return Ok(()); + } + Err(err) => { + error!(error = %eyre::Report::new(err), "received error from client"); + continue; + } + }; + + match event.interface.as_str() { + "org.astarte-platform.rust.get-started.IndividualServer" => { + // parse the path to extract the id part + // e.g. '/42/data' will strip the '/' and '/data' to return '42' + let id = event + .path + .strip_prefix("/") + .and_then(|s| s.strip_suffix("/data")) + .ok_or_eyre("couldn't get endpoint id parameter")? + .to_string(); + + let ServerIndividual::Double(value) = ServerIndividual::from_event(event)?; + + info!(id, value, "received new datastream on IndividualServer"); + } + interface => { + warn!(interface, "unhandled interface event received"); + + continue; + } + } + } +} + +/// Aggregated object +#[derive(Debug, IntoAstarteObject)] +struct AggregatedDevice { + double_endpoint: f64, + string_endpoint: String, +} + +/// Send data after an interval to every interface +async fn send_data(client: DeviceClient) -> eyre::Result<()> { + // Every 2 seconds send the data + let mut interval = tokio::time::interval(Duration::from_secs(2)); + + loop { + // Publish on the IndividualDevice + client + .send( + "org.astarte-platform.rust.get-started.IndividualDevice", + "/double_endpoint", + 42.6, + ) + .await?; + // Publish on the Aggregaed + let obj_data = AggregatedDevice { + double_endpoint: 42.0, + string_endpoint: "Sensor 1".to_string(), + }; + client + .send_object( + "org.astarte-platform.rust.get-started.Aggregated", + "/group_data", + obj_data.try_into()?, + ) + .await?; + // Set the Property + client + .send( + "org.astarte-platform.rust.get-started.Property", + "/double_endpoint", + 42.0, + ) + .await?; + + interval.tick().await; + } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; + + let (client, connection) = init().await?; + + info!("connected to the MessageHub"); + + let mut tasks = JoinSet::>::new(); + + // task to poll updates from the connection + tasks.spawn(async move { + connection.handle_events().await?; + + Ok(()) + }); + + // receive events from the MessageHub + tasks.spawn(receive_data(client.clone())); + + // send data to the MessageHub + tasks.spawn(send_data(client)); + + // cleanly close all the other tasks + tasks.spawn(async move { + tokio::signal::ctrl_c().await?; + + info!("SIGINT received, exiting"); + + Ok(()) + }); + + // handle tasks termination + while let Some(res) = tasks.join_next().await { + match res { + Ok(Ok(())) => { + // Close all the other tasks + tasks.abort_all(); + } + Ok(Err(err)) => { + error!("task returned an error"); + + return Err(err); + } + Err(err) if err.is_cancelled() => {} + Err(err) => { + error!("task panicked"); + + return Err(err.into()); + } + } + } + + info!("device disconnected"); + + Ok(()) +} diff --git a/examples/object_datastream/main.rs b/examples/object_datastream/main.rs index 8ea6b78b2..9575b443a 100644 --- a/examples/object_datastream/main.rs +++ b/examples/object_datastream/main.rs @@ -16,16 +16,16 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::time::Duration; + use serde::{Deserialize, Serialize}; -#[cfg(feature = "derive")] use astarte_device_sdk::IntoAstarteObject; use astarte_device_sdk::{ - builder::DeviceBuilder, error::Error, prelude::*, store::memory::MemoryStore, - transport::mqtt::MqttConfig, + builder::DeviceBuilder, prelude::*, store::memory::MemoryStore, transport::mqtt::MqttConfig, }; -#[cfg(not(feature = "derive"))] -use astarte_device_sdk_derive::IntoAstarteObject; +use tokio::task::JoinSet; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Serialize, Deserialize)] struct Config { @@ -43,12 +43,15 @@ struct DataObject { } #[tokio::main] -async fn main() -> Result<(), Error> { - env_logger::init(); +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; // Load configuration - let file = std::fs::read_to_string("./examples/object_datastream/configuration.json").unwrap(); - let cfg: Config = serde_json::from_str(&file).unwrap(); + let file = std::fs::read_to_string("./examples/object_datastream/configuration.json")?; + let cfg: Config = serde_json::from_str(&file)?; let mut mqtt_config = MqttConfig::with_credential_secret( &cfg.realm, @@ -67,8 +70,11 @@ async fn main() -> Result<(), Error> { .build() .await?; + let mut tasks = JoinSet::>::new(); + // Create an thread to transmit - tokio::task::spawn(async move { + tasks.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { let data = DataObject { endpoint1: 1.34, @@ -83,15 +89,30 @@ async fn main() -> Result<(), Error> { "/23", data.try_into().unwrap(), ) - .await - .unwrap(); + .await?; - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + interval.tick().await; } }); // Use the current thread to handle the connection (no incoming messages are expected in this example) - connection.handle_events().await?; + tasks.spawn(async move { + connection.handle_events().await?; + + Ok(()) + }); + + while let Some(res) = tasks.join_next().await { + match res { + Ok(res) => { + res?; + + tasks.abort_all(); + } + Err(err) if err.is_cancelled() => {} + Err(err) => return Err(err.into()), + } + } Ok(()) } diff --git a/examples/registration/main.rs b/examples/registration/main.rs index af4642521..71e468bbe 100644 --- a/examples/registration/main.rs +++ b/examples/registration/main.rs @@ -18,6 +18,8 @@ * SPDX-License-Identifier: Apache-2.0 */ use serde::{Deserialize, Serialize}; +use tracing::info; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Serialize, Deserialize, Debug)] struct Config { @@ -28,17 +30,17 @@ struct Config { } #[tokio::main] -async fn main() { - env_logger::init(); +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; // Load configuration let file = std::fs::read_to_string("./examples/registration/configuration.json").unwrap(); let cfg: Config = serde_json::from_str(&file).unwrap(); - println!( - "Attempting to register the device with the ID: {}", - cfg.device_id - ); + info!(%cfg.device_id, "attempting to register the device"); let credentials_secret = astarte_device_sdk::transport::mqtt::registration::register_device( &cfg.pairing_token, @@ -49,5 +51,10 @@ async fn main() { .await .unwrap(); - println!("Device registered, received credentials secret is: {credentials_secret}"); + info!( + credentials_secret, + "device registered, received credentials secret" + ); + + Ok(()) } diff --git a/examples/retention/main.rs b/examples/retention/main.rs index 07a1a5a33..a6d37a2c7 100644 --- a/examples/retention/main.rs +++ b/examples/retention/main.rs @@ -16,11 +16,10 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::{env::VarError, error::Error, time::Duration}; +use std::{env::VarError, time::Duration}; use astarte_device_sdk::{builder::DeviceBuilder, prelude::*, transport::mqtt::MqttConfig}; - -type DynError = Box; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const INTERFACE_STORED: &str = include_str!( "./interfaces/org.astarte-platform.rust.examples.individual-datastream.StoredDeviceDatastream.json" @@ -47,8 +46,11 @@ fn get_env(name: &'static str) -> Result { } #[tokio::main] -async fn main() -> Result<(), DynError> { - env_logger::init(); +async fn main() -> eyre::Result<()> { + color_eyre::install()?; + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .try_init()?; let realm = get_env("ASTARTE_REALM")?; let device_id = get_env("ASTARTE_DEVICE_ID")?; diff --git a/src/_docs/_connect_to_the_astarte_msghub.rs b/src/_docs/_connect_to_the_astarte_msghub.rs new file mode 100644 index 000000000..25531bbe3 --- /dev/null +++ b/src/_docs/_connect_to_the_astarte_msghub.rs @@ -0,0 +1,20 @@ +// This file is part of Astarte. +// +// Copyright 2025 SECO Mind Srl +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Example on how to connect to the Astarte MessageHub. +#![doc = include_str!("../../docs/connect-to-the-astarte-message-hub.md")] diff --git a/src/_docs/mod.rs b/src/_docs/mod.rs index 81e8f0e75..50d873224 100644 --- a/src/_docs/mod.rs +++ b/src/_docs/mod.rs @@ -17,4 +17,8 @@ // SPDX-License-Identifier: Apache-2.0 //! Documentation and examples on how to use the library. + +#[cfg(feature = "message-hub")] +#[cfg_attr(docsrs, doc(cfg(feature = "message-hub")))] +pub mod _connect_to_the_astarte_msghub; pub mod _get_started; diff --git a/src/client.rs b/src/client.rs index da95b1fe3..c9a973a42 100644 --- a/src/client.rs +++ b/src/client.rs @@ -253,7 +253,7 @@ pub trait ClientDisconnect { pub struct DeviceClient { pub(crate) interfaces: Arc>, // We use flume instead of the mpsc channel for the DeviceEvents for the connection to che - // client since we need the Receiver end to be clonable. Flume provides an async mpmc + // client since we need the Receiver end to be cloneable. Flume provides an async mpmc // channel/queue that fits our needs and doesn't suffer from the "slow receiver" problem. // Since it doesn't block the sender till all the receivers have read the msg. Unlike the // Tokio broadcast channel (another mpmc channel implementation).