diff --git a/Cargo.toml b/Cargo.toml index 06d89758..1532d8df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ bevy_time = "0.16" bevy_utils = "0.16" clap = { version = "4.5.23", features = ["derive"] } mime_guess = "2.0.5" +rclrs = { git = "https://github.com/mxgrey/ros2_rust", branch = "crossflow", features = ["serde", "schemars"] } schemars = "0.9.0" serde = "1.0.219" serde_json = "1.0.140" @@ -95,6 +96,10 @@ zenoh = { workspace = true, features = ["unstable"], optional = true } zenoh-ext = { workspace = true, features = ["unstable"], optional = true } # --- end zenoh +# --- Dependencies for ros2 feature +rclrs = { workspace = true, optional = true } +# --- end ros2 + tracing = { workspace = true } strum = { version = "0.26.3", optional = true, features = ["derive"] } semver = { version = "1.0.24", optional = true } @@ -137,6 +142,13 @@ zenoh = [ "dep:tonic-prost-build", ] +ros2 = [ + "dep:rclrs", + "dep:futures-lite", + "dep:serde", + "dep:schemars", +] + # Turn on all capability related features. This differs from --all-features # because it would not include single_threaded_async which has a diminishing # effect on capabilities. @@ -145,6 +157,7 @@ maximal = [ "trace", "grpc", "zenoh", + "ros2", ] [dev-dependencies] @@ -157,6 +170,7 @@ test-log = { version = "0.2.16", features = [ members = [ "examples/diagram/calculator", "examples/zenoh-examples", + "examples/ros2", "diagram-editor", "diagram-editor/wasm", "examples/diagram/calculator_wasm", @@ -171,3 +185,6 @@ doc = false [build-dependencies] tonic-prost-build = { workspace = true, optional = true } + +[patch.crates-io] +rclrs = { git = "https://github.com/mxgrey/ros2_rust", branch = "crossflow" } diff --git a/README.md b/README.md index e7d9af91..9ce043f1 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,64 @@ [![Crates.io Version](https://img.shields.io/crates/v/crossflow)](https://crates.io/crates/crossflow) > [!IMPORTANT] -> For the ROS 2 integration feature, check out the [`ros2` branch](https://github.com/open-rmf/crossflow/tree/ros2). -> -> That feature is kept separate for now because it requires additional non-Rust setup. It will be merged into `main` after dynamic message introspection is finished. +> You are on a branch with experimental support for ROS 2 via [`rclrs`](https://github.com/ros2-rust/ros2_rust). +> This will require more steps to set up than usual, so installation instructions for the necessary packages are below: + +1. Install ROS Jazzy according to the [normal installation instructions](https://docs.ros.org/en/jazzy/Installation.html). + +2. Install Rust according to the [normal installation instructions](https://www.rust-lang.org/tools/install). + +3. Run these commands to set up the workspace with the message bindings: + +```bash +sudo apt install -y git libclang-dev python3-pip python3-vcstool # libclang-dev is required by bindgen +``` + +```bash +pip install git+https://github.com/colcon/colcon-cargo.git --break-system-packages +``` + +```bash +pip install git+https://github.com/colcon/colcon-ros-cargo.git --break-system-packages +``` + +For now we need a fork of `cargo-ament-build` until [this PR](https://github.com/ros2-rust/cargo-ament-build/pull/26) is merged and released: + +```bash +cargo install --git https://github.com/mxgrey/cargo-ament-build +``` + +Create a workspace with the necessary repos: + +```bash +mkdir -p workspace/src && cd workspace +``` + +```bash +git clone https://github.com/open-rmf/crossflow src/crossflow -b ros2 +``` + +```bash +vcs import src < src/crossflow/ros2-feature.repos +``` + +Source the ROS distro and build the workspace: + +```bash +source /opt/ros/jazzy/setup.bash +``` + +```bash +colcon build --allow-overriding action_msgs builtin_interfaces common_interfaces composition_interfaces example_interfaces geometry_msgs lifecycle_msgs nav_msgs rcl_interfaces rosgraph_msgs rosidl_default_generators rosidl_default_runtime sensor_msgs sensor_msgs_py service_msgs statistics_msgs std_msgs std_srvs trajectory_msgs type_description_interfaces unique_identifier_msgs visualization_msgs +``` + +4. After `colcon build` has finished, you should see a `.cargo/config.toml` file inside your workspace, with `[patch.crates-io.___]` sections pointing to the generated message bindings. Now you should source the workspace using + +```bash +source install/setup.bash +``` + +Now you can run the [ROS 2 example](examples/ros2/README.md). You can also create your own crate in this colcon workspace and link as shown in the example. # Reactive Programming for Bevy diff --git a/diagram-editor/server/basic_executor.rs b/diagram-editor/server/basic_executor.rs index 60efcdfe..6834cec3 100644 --- a/diagram-editor/server/basic_executor.rs +++ b/diagram-editor/server/basic_executor.rs @@ -56,8 +56,8 @@ pub struct RunArgs { #[arg(help = "path to the diagram to run")] diagram: String, - #[arg(help = "json containing the request to the diagram")] - request: String, + #[arg(help = "json containing the request to the diagram. Defaults to null if not set.")] + request: Option, } #[derive(Parser, Debug)] @@ -72,7 +72,7 @@ pub fn headless(args: RunArgs, registry: DiagramElementRegistry) -> Result<(), B let file = File::open(args.diagram).unwrap(); let diagram = Diagram::from_reader(file)?; - let request = serde_json::Value::from_str(&args.request)?; + let request = serde_json::Value::from_str(args.request.as_ref().map(|s| s.as_str()).unwrap_or("null"))?; let mut promise = app.world_mut() .command(|cmds| -> Result, DiagramError> { diff --git a/examples/ros2/Cargo.toml b/examples/ros2/Cargo.toml new file mode 100644 index 00000000..0d6c98fa --- /dev/null +++ b/examples/ros2/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "ros2-workflow-examples" +edition = "2021" +version = "0.0.1" + +[[bin]] +name = "nav-executor" +path = "src/nav_executor.rs" + +[[bin]] +name = "fake-plan-generator" +path = "src/fake_plan_generator.rs" + +[[bin]] +name = "goal-requester" +path = "src/goal_requester.rs" + +[dependencies] +# Dependency to help us run the diagram editor and executor +crossflow_diagram_editor = { version = "0.0.1", path = "../../diagram-editor", features = ["basic_executor"] } + +# Dependencies for the nav operation catalog +crossflow = { version = "0.0.1", path = "../..", features = ["diagram", "ros2"] } +serde = { workspace = true } +serde_json = { workspace = true } +schemars = { workspace = true } + +# Dependencies for our custom ROS operations +rclrs = { workspace = true } +nav_msgs = { version = "*", features = ["serde", "schemars"] } +geometry_msgs = { version = "*", features = ["serde", "schemars"] } +std_msgs = { version = "*", features = ["serde", "schemars"] } +builtin_interfaces = { version = "*", features = ["serde", "schemars"] } +example_interfaces = { version = "*", features = ["serde", "schemars"] } + +# Dependencies for goal-requester and fake-plan-generator +clap = { workspace = true } +rand = "0.9.2" diff --git a/examples/ros2/README.md b/examples/ros2/README.md new file mode 100644 index 00000000..c8f2ee0a --- /dev/null +++ b/examples/ros2/README.md @@ -0,0 +1,70 @@ +## Building workflows with ROS nodes + +The examples in this directory showcase how to incorporate ROS primitives into a workflow. We model a small portion of a navigation workflow that involves receiving goals, fetching paths to connect those goals from a planning service, and then queuing up the paths for execution. + +This diagram represents the workflow implemented by [`diagrams/default-nav-system.json`](diagrams/default-nav-system.json): + +![nav-example-workflow](assets/figures/nav-example.png) + +After following the build instructions in the [root README](../../README.md), run this example with the following steps: + +1. Source the workspace with + +```bash +source install/setup.bash # run in the root of your colcon workspace +``` + +2. Then you can go into this `examples/ros2` directory and run the example with + +```bash +cargo run --bin nav-executor -- run diagrams/default-nav-system.json # run in this directory +``` + +3. The `nav-example` workflow will wait until it receives some goals to process. You can send it some randomized goals by **opening a new terminal** in the same directory and running + +```bash +source install/setup.bash # run in the root of your colcon workspace +``` + +```bash +cargo run --bin goal-requester # run in this directory +``` + +Now the workflow will print out all the plans that it has received from the `fake-plan-generator` and stored in its buffer. If the workflow were connected to a real navigation system, it could pull these plans from the buffer one at a time and execute them. + +4. Our workflow also allows the paths to be cleared out of the buffer, i.e. "cancelled". To cancel the current set of goals, run: + +```bash +cargo run --bin goal-requester -- --cancel +``` + +After that you should see a printout of + +``` +Paths currently waiting to run: +[] +``` + +which indicates that the buffer has been successfully cleared out. + +5. You can view and edit the navigation workflow by running the web-based editor: + +```bash +cargo run --bin nav-executor -- serve # run in this directory +``` + +Open up a web browser (chrome works best) and go to [http://localhost:3000](http://localhost:3000). Use the upload button to load the default nav workflow from [`diagrams/default-nav-system.json`](diagrams/default-nav-system.json). After you've edited it, you can run it immediately using the play button (enter `null` into the input box that pops up). Or you can use the export button to download and save your changes. + +6. The diagram editor does not currently have a button for cancelling a workflow. To stop a workflow that is currently running, use + +```bash +ros2 topic pub end_workflow std_msgs/Empty +``` + +The default workflow will listen to the `/end_workflow` topic for any message, and then exit as soon as one is received. After you have triggerd the workflow to end, you should this message in the response: + +``` +"workflow ended by /end_workflow request" +``` + +7. To see how to register custom diagram operations and any ROS messages, services, or actions that you need, take a look at [`src/nav_ops_catalog.rs`](src/nav_ops_catalog.rs). diff --git a/examples/ros2/assets/figures/nav-example.png b/examples/ros2/assets/figures/nav-example.png new file mode 100644 index 00000000..2d249f69 Binary files /dev/null and b/examples/ros2/assets/figures/nav-example.png differ diff --git a/examples/ros2/diagrams/default-nav-system.json b/examples/ros2/diagrams/default-nav-system.json new file mode 100644 index 00000000..981aead2 --- /dev/null +++ b/examples/ros2/diagrams/default-nav-system.json @@ -0,0 +1,136 @@ +{ + "$schema": "https://raw.githubusercontent.com/open-rmf/crossflow/refs/heads/main/diagram.schema.json", + "version": "0.1.0", + "templates": {}, + "start": "setup", + "ops": { + "quit_request": { + "type": "transform", + "cel": "\"workflow ended by /end_workflow request\"", + "next": { + "builtin": "terminate" + } + }, + "end_workflow": { + "type": "node", + "builder": "nav_manager__std_msgs_msg_Empty__subscription", + "next": { + "builtin": "dispose" + }, + "display_text": "End Workflow", + "config": { + "topic": "end_workflow" + }, + "stream_out": { + "out": "quit_request" + } + }, + "setup": { + "type": "fork_clone", + "next": [ + "print_welcome", + "receive_cancel", + "end_workflow" + ] + }, + "print_welcome": { + "type": "node", + "builder": "print", + "config": { + "message": "Waiting to receive goals from 'request_goal' topic", + "include_value": false + }, + "next": "receive_goals" + }, + "receive_goals": { + "type": "node", + "builder": "nav_manager__nav_msgs_msg_Goals__subscription", + "config": { + "topic": "request_goal" + }, + "next": "quit_error", + "display_text": "Receive Goals", + "stream_out": { + "out": "fetch_plans" + } + }, + "quit_error": { + "type": "transform", + "cel": "\"error: unable to receive goals\"", + "next": { + "builtin": "terminate" + } + }, + "fetch_plans": { + "type": "node", + "builder": "fetch_plans", + "config": { + "planner_service": "get_plan", + "tolerance": 0.1 + }, + "next": { + "builtin": "dispose" + }, + "display_text": "Fetch Plans", + "stream_out": { + "paths": "path_buffer" + } + }, + "path_buffer": { + "type": "buffer", + "settings": { + "retention": "keep_all" + } + }, + "listen_to_path_buffer": { + "type": "listen", + "buffers": [ + "path_buffer" + ], + "next": "print_paths" + }, + "print_paths": { + "type": "node", + "builder": "print_from_buffer", + "config": "Paths currently waiting to run:\n", + "next": { + "builtin": "dispose" + }, + "display_text": "Print Paths" + }, + "receive_cancel": { + "type": "node", + "builder": "nav_manager__std_msgs_msg_Empty__subscription", + "config": { + "topic": "cancel_goals" + }, + "next": { + "builtin": "dispose" + }, + "display_text": "Cancel Goals", + "stream_out": { + "out": "trigger_clear_paths" + } + }, + "trigger_clear_paths": { + "type": "transform", + "cel": "null", + "next": "access_paths" + }, + "access_paths": { + "type": "buffer_access", + "buffers": [ + "path_buffer" + ], + "next": "clear_paths" + }, + "clear_paths": { + "type": "node", + "builder": "clear_paths", + "next": { + "builtin": "dispose" + }, + "display_text": "Clear Paths" + } + } +} diff --git a/examples/ros2/src/fake_plan_generator.rs b/examples/ros2/src/fake_plan_generator.rs new file mode 100644 index 00000000..7b604c83 --- /dev/null +++ b/examples/ros2/src/fake_plan_generator.rs @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use builtin_interfaces::msg::Time as TimeMsg; +use geometry_msgs::msg::{Point, Pose, PoseStamped, Quaternion}; +use nav_msgs::srv::{GetPlan, GetPlan_Request, GetPlan_Response}; +use rclrs::*; +use std_msgs::msg::Header; + +fn main() { + let context = Context::default_from_env().unwrap(); + let mut executor = context.create_basic_executor(); + + let node = executor.create_node("fake_plan_generator").unwrap(); + let logger = node.logger().clone(); + let _service = + node.create_service::("get_plan", move |request: GetPlan_Request| { + log!( + &logger, + "Serving a fake plan from:\n - {:?}\nto\n - {:?}", + request.start, + request.goal, + ); + // Just linearly interpolate 5 points between the start and the goal + let num_points = 5; + let mut response = GetPlan_Response::default(); + for i in 1..=num_points { + let s = i as f64 / num_points as f64; + response + .plan + .poses + .push(interpolate_pose(s, &request.start, &request.goal)); + } + + response + }); + + log!(node.logger(), "Ready to serve fake plans..."); + executor.spin(SpinOptions::default()); +} + +fn interpolate_pose(s: f64, start: &PoseStamped, goal: &PoseStamped) -> PoseStamped { + PoseStamped { + header: interpolate_header(s, &start.header, &goal.header), + pose: Pose { + position: interpolate_point(s, &start.pose.position, &goal.pose.position), + orientation: interpolate_orientation( + s, + &start.pose.orientation, + &goal.pose.orientation, + ), + }, + } +} + +fn interpolate_header(s: f64, start: &Header, goal: &Header) -> Header { + let t_start = secs_from_msg(&start.stamp); + let t_goal = secs_from_msg(&goal.stamp); + let t = s * (t_goal as f64 - t_start as f64) + t_start as f64; + let t = t as i64; + + Header { + stamp: msg_from_secs(t), + frame_id: start.frame_id.clone(), + } +} + +fn secs_from_msg(msg: &TimeMsg) -> i64 { + (msg.sec as i64 * 1_000_000_000) + msg.nanosec as i64 +} + +fn msg_from_secs(t: i64) -> TimeMsg { + let nanosec = (t % 1_000_000_000) as u32; + let sec = (t / 1_000_000_000) as i32; + TimeMsg { sec, nanosec } +} + +fn interpolate_point(s: f64, start: &Point, goal: &Point) -> Point { + Point { + x: s * (goal.x - start.x) + start.x, + y: s * (goal.y - start.y) + start.y, + z: s * (goal.z - start.z) + start.z, + } +} + +fn interpolate_orientation(s: f64, start: &Quaternion, goal: &Quaternion) -> Quaternion { + // Just do a rough slerp approximation. This is a fake planner so + // we don't need to do anything serious here. + Quaternion { + x: (1.0 - s) * start.x + s * goal.x, + y: (1.0 - s) * start.y + s * goal.y, + z: (1.0 - s) * start.z + s * goal.z, + w: (1.0 - s) * start.w + s * goal.w, + } +} diff --git a/examples/ros2/src/goal_requester.rs b/examples/ros2/src/goal_requester.rs new file mode 100644 index 00000000..1425b2fc --- /dev/null +++ b/examples/ros2/src/goal_requester.rs @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use clap::Parser; +use rclrs::*; + +use builtin_interfaces::msg::Time as TimeMsg; +use geometry_msgs::msg::{Point, Pose, PoseStamped, Quaternion}; +use nav_msgs::msg::Goals; +use std_msgs::msg::{Empty as EmptyMsg, Header}; + +use rand::prelude::*; + +fn main() { + let context = Context::default_from_env().unwrap(); + let args = Args::parse(); + let mut executor = context.create_basic_executor(); + let node = executor.create_node("goal_requester").unwrap(); + + let task = if args.cancel { + let topic = "cancel_goals"; + let publisher = node + .create_publisher::(topic.transient_local()) + .unwrap(); + + executor.commands().run(async move { + log!( + node.logger(), + "Waiting for a subscriber on topic {topic}..." + ); + let subscriber_ready = node.notify_on_graph_change({ + let publisher = publisher.clone(); + move || publisher.get_subscription_count().unwrap() > 0 + }); + + subscriber_ready.await.unwrap(); + + log!(node.logger(), "Requesting cancel"); + publisher.publish(EmptyMsg::default()).unwrap(); + }) + } else { + let topic = "request_goal"; + let publisher = node + .create_publisher::(topic.transient_local()) + .unwrap(); + + let mut goals = Vec::new(); + for i in 0..args.count { + let time = i as i32; + let mut rng = rand::rng(); + let x = rng.random(); + let y = rng.random(); + let z = 0.0; + + let orientation = Quaternion { + x: 0.0, + y: 0.0, + z: 0.0, + w: 1.0, + }; + + goals.push(PoseStamped { + header: Header { + stamp: TimeMsg { + sec: time, + nanosec: 0, + }, + frame_id: "map".to_string(), + }, + pose: Pose { + position: Point { x, y, z }, + orientation, + }, + }); + } + + let request = Goals { + header: Header { + stamp: TimeMsg::default(), + frame_id: "map".to_string(), + }, + goals, + }; + + executor.commands().run(async move { + log!(node.logger(), "Waiting for subscriber on topic {topic}..."); + let subscriber_ready = node.notify_on_graph_change({ + let publisher = publisher.clone(); + move || publisher.get_subscription_count().unwrap() > 0 + }); + + subscriber_ready.await.unwrap(); + + log!(node.logger(), "Publishing goals: {:#?}", request); + publisher.publish(request).unwrap(); + }) + }; + + executor.spin(SpinOptions::default().until_promise_resolved(task)); +} + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// How many random goals to generate + #[arg(short, long, default_value_t = 3)] + count: u8, + #[arg(long)] + cancel: bool, +} diff --git a/examples/ros2/src/lib.rs b/examples/ros2/src/lib.rs new file mode 100644 index 00000000..6ae0354b --- /dev/null +++ b/examples/ros2/src/lib.rs @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +mod nav_ops_catalog; +pub use nav_ops_catalog::register_nav_catalog; diff --git a/examples/ros2/src/nav_executor.rs b/examples/ros2/src/nav_executor.rs new file mode 100644 index 00000000..5e65faa7 --- /dev/null +++ b/examples/ros2/src/nav_executor.rs @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use ros2_workflow_examples::register_nav_catalog; +use rclrs::*; +use crossflow_diagram_editor::basic_executor::{self, DiagramElementRegistry, Error}; + +fn main() -> Result<(), Box> { + // Set up the critical rclrs components + let context = Context::default_from_env().unwrap(); + let mut executor = context.create_basic_executor(); + let node = executor.create_node("nav_manager").unwrap(); + + // Set up the custom ROS registry and get rclrs executor running + let mut registry = DiagramElementRegistry::new(); + register_nav_catalog(&mut registry, &node); + + // Get the rclrs executor running its own thread + let executor_commands = executor.commands().clone(); + let executor_thread = std::thread::spawn(move || executor.spin(SpinOptions::default())); + + // Run the workflow execution program with our custom registry + let r = basic_executor::run(registry); + + // Close down the executor and join its thread gracefully to avoid noisy thread-crashing errors + executor_commands.halt_spinning(); + let _ = executor_thread.join(); + + r +} diff --git a/examples/ros2/src/nav_ops_catalog.rs b/examples/ros2/src/nav_ops_catalog.rs new file mode 100644 index 00000000..b0d4dcb7 --- /dev/null +++ b/examples/ros2/src/nav_ops_catalog.rs @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use crossflow::prelude::*; +use rclrs::{Node as Ros2Node, Promise, *}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use std::collections::VecDeque; + +use nav_msgs::{ + msg::{Goals, Path}, + srv::{GetPlan, GetPlan_Request, GetPlan_Response}, +}; + +use example_interfaces::action::Fibonacci; + +use std_msgs::msg::Empty as EmptyMsg; + +pub fn register_nav_catalog(registry: &mut DiagramElementRegistry, node: &Ros2Node) { + registry + .enable_ros2(node.clone()) + .register_ros2_message::() + .register_ros2_message::() + .register_ros2_message::() + .register_ros2_service::() + .register_ros2_action::(); + + // Create a client that fetches plans from a server. Each pair of + // consecutive goals is planned for independently, in parallel. + // + // After issuing every plan request, we will await them in order of first to + // last and stream them out from the node in that order, regardless of the + // order in which the plans arrive from the service. + // + // We make a custom node builder for this instead of using the generic service + // client builder because we want the input to be processed in a specific way: + // breaking it into multiple parallel requests and reassembling them in order + // as they complete. In the future we might introduce generalized "enumerate" + // and "collect" operations which could replace this. + registry.register_node_builder(NodeBuilderOptions::new("fetch_plans"), { + let node = node.clone(); + move |builder, config: PlanningConfig| { + let tolerance = config.tolerance; + let client = node + .create_client::(&config.planner_service) + .unwrap(); + + let logger = node.logger().clone(); + builder.create_map(move |input: AsyncMap| { + let client = client.clone(); + let logger = logger.clone(); + async move { + log!(&logger, "Waiting for planning service..."); + client.notify_on_service_ready().await.unwrap(); + + let mut from_iter = input.request.goals.iter(); + let mut to_iter = input.request.goals.iter().skip(1); + + let mut plan_promises = VecDeque::new(); + while let (Some(start), Some(goal)) = + (from_iter.next().cloned(), to_iter.next().cloned()) + { + log!(&logger, "Requesting a plan from {start:?} to {goal:?}"); + let request = GetPlan_Request { + start, + goal, + tolerance, + }; + let promise: Promise = client.call(request).unwrap(); + plan_promises.push_back(promise); + } + + while let Some(promise) = plan_promises.pop_front() { + let response = promise.await.unwrap(); + log!( + &logger, + "Received a plan from {:?} to {:?}", + response.plan.poses.first().map(|x| x.pose.clone()), + response.plan.poses.last().map(|x| x.pose.clone()), + ); + input.streams.paths.send(response.plan); + } + } + }) + } + }); + + // We can't really execute the paths in this example program, so let's just + // print whatever accumulates in the buffer. + registry + .opt_out() + .no_serializing() + .no_deserializing() + .register_node_builder( + NodeBuilderOptions::new("print_from_buffer"), + move |builder, msg: Option| { + let callback = move |In(key): In, world: &World| { + let buffer = world.json_buffer_view(&key).unwrap(); + let values: Result, _> = buffer.iter().collect(); + let values = values.unwrap(); + println!("{}{values:#?}", msg.as_ref().unwrap_or(&String::new())); + }; + + builder.create_node(callback.into_blocking_callback()) + }, + ) + .with_listen(); + + registry.register_node_builder( + NodeBuilderOptions::new("print"), + move |builder, + PrintConfig { + message, + include_value, + }: PrintConfig| { + builder.create_map_block(move |value: JsonMessage| { + if include_value { + println!("{message}{value:#?}"); + } else { + println!("{message}"); + } + value + }) + }, + ); + + registry + .opt_out() + .no_serializing() + .no_deserializing() + .register_node_builder( + NodeBuilderOptions::new("clear_paths"), + move |builder, _config: ()| { + let clear_paths = clear_paths.into_blocking_callback(); + builder.create_node(clear_paths) + }, + ) + .with_buffer_access(); +} + +#[derive(Serialize, Deserialize, JsonSchema)] +struct PlanningConfig { + planner_service: String, + tolerance: f32, +} + +#[derive(StreamPack)] +struct PathStream { + paths: Path, +} + +fn clear_paths(In((_, key)): In<((), BufferKey)>, mut access: BufferAccessMut) { + let mut buffer = access.get_mut(&key).unwrap(); + // Remove all goals from the buffer by draining its full range. + buffer.drain(..); +} + +#[derive(Serialize, Deserialize, JsonSchema)] +struct PrintConfig { + message: String, + include_value: bool, +} diff --git a/package.xml b/package.xml new file mode 100644 index 00000000..b20f1879 --- /dev/null +++ b/package.xml @@ -0,0 +1,26 @@ + + + + crossflow + 0.0.1 + General-purpose workflow execution + Michael X. Grey + Apache License 2.0 + Michael X. Grey + + rclrs + rosidl_runtime_rs + builtin_interfaces + + + geometry_msgs + nav_msgs + std_msgs + example_interfaces + + + ament_cargo + + diff --git a/ros2-feature.repos b/ros2-feature.repos new file mode 100644 index 00000000..61fe97d7 --- /dev/null +++ b/ros2-feature.repos @@ -0,0 +1,37 @@ +repositories: + ros2/common_interfaces: + type: git + url: https://github.com/ros2/common_interfaces.git + version: jazzy + ros2/example_interfaces: + type: git + url: https://github.com/ros2/example_interfaces.git + version: jazzy + ros2/rcl_interfaces: + type: git + url: https://github.com/ros2/rcl_interfaces.git + version: jazzy + ros2/test_interface_files: + type: git + url: https://github.com/ros2/test_interface_files.git + version: jazzy + ros2/rosidl_core: + type: git + url: https://github.com/ros2/rosidl_core.git + version: jazzy + ros2/rosidl_defaults: + type: git + url: https://github.com/ros2/rosidl_defaults.git + version: jazzy + ros2/unique_identifier_msgs: + type: git + url: https://github.com/ros2/unique_identifier_msgs.git + version: jazzy + ros2-rust/rosidl_rust: + type: git + url: https://github.com/mxgrey/rosidl_rust.git + version: crossflow + ros2-rust/rosidl_runtime_rs: + type: git + url: https://github.com/mxgrey/rosidl_runtime_rs.git + version: crossflow diff --git a/src/async_execution/mod.rs b/src/async_execution/mod.rs index d2dc40ba..e176ec0f 100644 --- a/src/async_execution/mod.rs +++ b/src/async_execution/mod.rs @@ -26,7 +26,7 @@ pub(crate) use single_threaded_execution::SingleThreadedExecution; #[cfg(feature = "single_threaded_async")] use single_threaded_execution::SingleThreadedExecutionSender; -pub(crate) use bevy_tasks::Task as TaskHandle; +pub use bevy_tasks::Task as TaskHandle; #[cfg(not(feature = "single_threaded_async"))] pub(crate) type CancelSender = AsyncComputeTaskPoolSender; diff --git a/src/diagram.rs b/src/diagram.rs index 76e403eb..71d858c6 100644 --- a/src/diagram.rs +++ b/src/diagram.rs @@ -38,6 +38,11 @@ pub mod grpc; #[cfg(feature = "zenoh")] pub mod zenoh; +#[cfg(feature = "ros2")] +mod ros2; +#[cfg(feature = "ros2")] +pub use ros2::*; + use bevy_derive::{Deref, DerefMut}; use bevy_ecs::system::Commands; use buffer_schema::{BufferAccessSchema, BufferSchema, ListenSchema}; diff --git a/src/diagram/ros2.rs b/src/diagram/ros2.rs new file mode 100644 index 00000000..b5879771 --- /dev/null +++ b/src/diagram/ros2.rs @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use super::*; +use crate::BuildRos2; +use rclrs::{ + ActionClientOptions, ActionIDL, GoalStatus, GoalStatusCode, MessageIDL, Node as Ros2Node, + PrimitiveOptions, QosOptions, ServiceIDL, +}; +use serde::de::DeserializeOwned; +use tokio::sync::mpsc::UnboundedSender; + +impl DiagramElementRegistry { + /// Use this to register workflow node builders that will produce subscriptions, + /// publishers, service clients, and action clients belong to a specific ROS 2 + /// node. + /// + /// This will return a [`Ros2Registry`] which you can use to register messages + /// (for subscriptions and publishers), services, and actions. + /// + /// The node builders that get registered will be named + /// `{node_name}_{type_name}_{subscription|publisher|service_client|action_client}` + #[must_use = "no node builders will be created unless you register the specific messages, services, or actions that you need"] + pub fn enable_ros2(&mut self, ros2_node: Ros2Node) -> Ros2Registry<'_> { + Ros2Registry { + ros2_node, + registry: self, + } + } +} + +/// Use this to register message, service, and action bindings so that the +/// registry can build workflow nodes for ROS 2 subscriptions, publishers, +/// service clients, and action clients. +pub struct Ros2Registry<'a> { + ros2_node: Ros2Node, + registry: &'a mut DiagramElementRegistry, +} + +impl<'a> Ros2Registry<'a> { + /// Register a message definition to obtain node builders for publishers and + /// subscriptions for this message type. + pub fn register_ros2_message( + &mut self, + ) -> &'_ mut Self { + let node_name_snake = self.ros2_node.name().replace("/", "_"); + let message_name_minimal = T::TYPE_NAME.split("/").last().unwrap_or(""); + let message_name_snake = T::TYPE_NAME.replace("/", "_"); + + let ros2_node = self.ros2_node.clone(); + self.registry + .register_node_builder( + NodeBuilderOptions::new(format!( + "{node_name_snake}__{message_name_snake}__subscription" + )) + .with_default_display_text(format!("{message_name_minimal} Subscription")), + move |builder, config: PrimitiveOptions| { + builder.create_ros2_subscription::(ros2_node.clone(), config) + }, + ) + .with_result(); + + // For cancel sender + self.registry + .opt_out() + .no_serializing() + .no_deserializing() + .register_message::>(); + + let ros2_node = self.ros2_node.clone(); + self.registry + .register_node_builder_fallible( + NodeBuilderOptions::new(format!( + "{node_name_snake}__{message_name_snake}__publisher" + )) + .with_default_display_text(format!("{message_name_minimal} Publisher",)), + move |builder, config: PrimitiveOptions| { + let node = builder.create_ros2_publisher::(ros2_node.clone(), config)?; + Ok(node) + }, + ) + .with_result(); + + self + } + + /// Register a service definition to obtain a node builder for a service + /// client that can use this service definition. + pub fn register_ros2_service(&mut self) -> &'_ mut Self + where + S::Request: Serialize + DeserializeOwned + JsonSchema, + S::Response: Serialize + DeserializeOwned + JsonSchema, + { + let node_name_snake = self.ros2_node.name().replace("/", "_"); + let service_name_minimal = S::TYPE_NAME.split("/").last().unwrap_or(""); + let service_name_snake = S::TYPE_NAME.replace("/", "_"); + + let ros2_node = self.ros2_node.clone(); + self.registry + .register_node_builder_fallible( + NodeBuilderOptions::new(format!("{node_name_snake}__{service_name_snake}__client")) + .with_default_display_text(format!("{service_name_minimal} Client")), + move |builder, config: PrimitiveOptions| { + let node = builder + .create_ros2_service_client::(ros2_node.clone(), config)?; + Ok(node) + }, + ) + .with_result(); + + self.registry + .register_message::>() + .with_result(); + + // For cancel sender + self.registry + .opt_out() + .no_serializing() + .no_deserializing() + .register_message::>(); + + self + } + + /// Register an action definition to obtain a node builder for an action + /// client that can use this action definition. + pub fn register_ros2_action(&mut self) -> &'_ mut Self + where + A::Goal: Serialize + DeserializeOwned + JsonSchema, + A::Result: Serialize + DeserializeOwned + JsonSchema, + A::Feedback: Serialize + DeserializeOwned + JsonSchema, + { + let node_name_snake = self.ros2_node.name().replace("/", "_"); + let action_name_minimal = A::TYPE_NAME.split("/").last().unwrap_or(""); + let action_name_snake = A::TYPE_NAME.replace("/", "_"); + + let ros2_node = self.ros2_node.clone(); + self.registry + .register_node_builder_fallible( + NodeBuilderOptions::new(format!("{node_name_snake}__{action_name_snake}__client")) + .with_default_display_text(format!("{action_name_minimal} Client")), + move |builder, config: ActionClientConfig| { + let node = builder + .create_ros2_action_client::(ros2_node.clone(), &config)?; + Ok(node) + }, + ) + .with_result(); + + self.registry + .register_message::>() + .with_result(); + + self.registry.register_message::(); + self.registry.register_message::(); + self.registry.register_message::(); + + // For cancel sender + self.registry + .opt_out() + .no_serializing() + .no_deserializing() + .register_message::>(); + + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct ActionClientConfig { + /// The name of the action that this client will send requests to + pub action: Arc, + /// The quality of service profile for the goal service + pub goal_service_qos: QosOptions, + /// The quality of service profile for the result service + pub result_service_qos: QosOptions, + /// The quality of service profile for the cancel service + pub cancel_service_qos: QosOptions, + /// The quality of service profile for the feedback topic + pub feedback_topic_qos: QosOptions, + /// The quality of service profile for the status topic + pub status_topic_qos: QosOptions, +} + +impl<'a> From<&'a ActionClientConfig> for ActionClientOptions<'a> { + fn from(value: &'a ActionClientConfig) -> Self { + let mut options = ActionClientOptions::new(&value.action); + value + .goal_service_qos + .apply_to(&mut options.goal_service_qos); + value + .result_service_qos + .apply_to(&mut options.result_service_qos); + value + .cancel_service_qos + .apply_to(&mut options.cancel_service_qos); + value + .feedback_topic_qos + .apply_to(&mut options.feedback_topic_qos); + value + .status_topic_qos + .apply_to(&mut options.status_topic_qos); + options + } +} diff --git a/src/lib.rs b/src/lib.rs index ac1c8a4e..ab230c5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,6 +137,11 @@ pub use provider::*; pub mod request; pub use request::*; +#[cfg(feature = "ros2")] +pub mod ros2; +#[cfg(feature = "ros2")] +pub use ros2::*; + pub mod service; pub use service::*; @@ -434,5 +439,8 @@ pub mod prelude { diagram::{Diagram, DiagramElementRegistry, DiagramError, NodeBuilderOptions, Section}, }; + #[cfg(feature = "ros2")] + pub use crate::BuildRos2; + pub use futures::FutureExt; } diff --git a/src/ros2.rs b/src/ros2.rs new file mode 100644 index 00000000..014764c3 --- /dev/null +++ b/src/ros2.rs @@ -0,0 +1,315 @@ +/* + * Copyright (C) 2025 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +use crate::{AsyncMap, Builder, NeverFinish, Node, StreamPack}; +use futures::{ + future::{select, Either}, + StreamExt, +}; +use futures_lite::future::race; +use rclrs::{ + ActionClientOptions, ActionIDL, CancelResponse, ClientOptions, GoalEvent, GoalStatus, + GoalStatusCode, IntoPrimitiveOptions, MessageIDL, Node as Ros2Node, PublisherOptions, + RclrsError, ServiceIDL, SubscriptionOptions, +}; +use schemars::JsonSchema; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{pin::pin, sync::Arc}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; + +#[derive(StreamPack)] +pub struct SubscriptionStreams { + pub out: T, + pub canceller: UnboundedSender, +} + +#[derive(StreamPack)] +pub struct ServiceClientStreams { + pub canceller: UnboundedSender, +} + +pub type ServiceClientNode = Node< + ::Request, + Result<::Response, Result>, + ServiceClientStreams, +>; + +#[derive(StreamPack)] +pub struct ActionClientStreams { + pub feedback: A::Feedback, + pub canceller: UnboundedSender, + pub status: GoalStatus, + pub cancellation_response: ActionCancellation, +} + +pub type ActionClientNode = Node< + ::Goal, + Result::Result>, Result>, + ActionClientStreams, +>; + +/// This is the final result returned by a ROS 2 action if it ended under normal +/// circumstances. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct ActionResult { + pub status: GoalStatusCode, + pub result: R, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct ActionCancellation { + pub signal: CancelSignal, + pub response: CancelResponse, +} + +pub trait BuildRos2 { + fn create_ros2_subscription<'o, T: MessageIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Node<(), Result, SubscriptionStreams> + where + CancelSignal: 'static + Send + Sync; + + fn create_ros2_publisher<'o, T: MessageIDL>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result>, RclrsError>; + + fn create_ros2_service_client<'o, S: ServiceIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result, RclrsError> + where + CancelSignal: 'static + Send + Sync; + + fn create_ros2_action_client<'o, A: ActionIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result, RclrsError> + where + CancelSignal: 'static + Send + Sync, + A::Result: Serialize + DeserializeOwned + JsonSchema; +} + +impl<'w, 's, 'a> BuildRos2 for Builder<'w, 's, 'a> { + fn create_ros2_subscription<'o, T: MessageIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Node<(), Result, SubscriptionStreams> + where + CancelSignal: 'static + Send + Sync, + { + let SubscriptionOptions { topic, qos, .. } = options.into(); + let topic: Arc = topic.into(); + self.create_map( + move |input: AsyncMap<(), SubscriptionStreams>| { + let topic = topic.clone(); + let qos = qos.clone(); + let ros2_node = ros2_node.clone(); + let (cancel_sender, mut cancel_receiver) = unbounded_channel(); + input.streams.canceller.send(cancel_sender); + + let subscribing = async move { + let mut receiver = ros2_node + .create_subscription_receiver::((&topic).qos(qos)) + .map_err(|err| err.to_string())?; + + while let Some(msg) = receiver.recv().await { + input.streams.out.send(msg); + } + + unreachable!( + "The channel of a SubscriptionReceiver can never close \ + because it keeps ownership of both the sender and receiver." + ); + }; + + let cancellation = async move { + let Some(msg) = cancel_receiver.recv().await else { + // The canceller was dropped, meaning the user will never + // cancel this node, so it should just keep running forever + // (it will be forcibly stopped during the workflow cleanup) + NeverFinish.await; + unreachable!("this future will never finish"); + }; + + Ok(msg) + }; + + race(subscribing, cancellation) + }, + ) + } + + fn create_ros2_publisher<'o, T: MessageIDL>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result>, RclrsError> { + let publisher = ros2_node.create_publisher(options)?; + let node = self.create_map_block(move |message: T| { + publisher.publish(message).map_err(|err| err.to_string()) + }); + Ok(node) + } + + fn create_ros2_service_client<'o, S: ServiceIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result, RclrsError> + where + CancelSignal: 'static + Send + Sync, + { + let client = ros2_node.create_client::(options)?; + let node = self.create_map( + move |input: AsyncMap>| { + let AsyncMap { + request, streams, .. + } = input; + let client = Arc::clone(&client); + let (cancel_sender, mut cancel_receiver) = unbounded_channel(); + streams.canceller.send(cancel_sender); + + let receive = async move { + client + .notify_on_service_ready() + .await + .map_err(|_| Err(String::from("failed to wait for action server")))?; + + let receiver = client.call(request); + let response: Result = match receiver { + Ok(receiver) => receiver.await, + Err(err) => return Err(Err(err.to_string())), + }; + + let Ok(response) = response else { + return Err(Err(String::from("response was cancelled by the executor"))); + }; + + Ok(response) + }; + + let cancellation = async move { + let Some(msg) = cancel_receiver.recv().await else { + // The canceller was dropped, meaning the user will never + // cancel this node, so it should just keep running forever + // (it will be forcibly stopped during the workflow cleanup) + NeverFinish.await; + unreachable!("this future will never finish"); + }; + + Err(Ok(msg)) + }; + + race(receive, cancellation) + }, + ); + + Ok(node) + } + + fn create_ros2_action_client<'o, A: ActionIDL, CancelSignal>( + &mut self, + ros2_node: Ros2Node, + options: impl Into>, + ) -> Result, RclrsError> + where + CancelSignal: 'static + Send + Sync, + A::Result: Serialize + DeserializeOwned + JsonSchema, + { + let client = ros2_node.create_action_client::(options)?; + let node = self.create_map( + move |input: AsyncMap>| { + let AsyncMap { + request, streams, .. + } = input; + let client = Arc::clone(&client); + let (cancel_sender, mut cancel_receiver) = unbounded_channel(); + streams.canceller.send(cancel_sender); + + async move { + client + .notify_on_action_ready() + .await + .map_err(|_| Err(String::from("failed to wait for action server")))?; + + let goal_requested = client.request_goal(request); + let Some(goal_client) = goal_requested.await else { + return Err(Err(String::from("goal was rejected"))); + }; + + let canceller = goal_client.cancellation.clone(); + let mut goal_client_stream = goal_client.stream(); + + let receiving = async move { + while let Some(event) = goal_client_stream.next().await { + match event { + GoalEvent::Feedback(feedback) => { + streams.feedback.send(feedback); + } + GoalEvent::Status(status) => { + streams.status.send(status); + } + GoalEvent::Result((status, result)) => { + return Ok(ActionResult { status, result }); + } + } + } + + Err(Err(String::from( + "goal stream closed without receiving result", + ))) + }; + + let cancellation = async move { + loop { + let Some(signal) = cancel_receiver.recv().await else { + // The canceller was dropped, meaning the user will never + // cancel this node, so it should just keep running forever + // (it will be forcibly stopped during the node cleanup) + NeverFinish.await; + unreachable!("this future will never finish"); + }; + + let response = canceller.cancel().await; + streams + .cancellation_response + .send(ActionCancellation { signal, response }); + } + }; + + match select(pin!(receiving), pin!(cancellation)).await { + Either::Left((received, _)) => { + return received; + } + Either::Right(_) => { + unreachable!("the cancellation future will never finish"); + } + } + } + }, + ); + + Ok(node) + } +} diff --git a/src/utils.rs b/src/utils.rs index f581f706..957a78eb 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -16,6 +16,7 @@ */ //! Miscellaneous utilities used internally + use std::{ future::Future, pin::Pin,