Skip to content

Commit 8a00b73

Browse files
authored
ros action feedback and status support in cxx api of dora (dora-rs#1148)
2 parents 6325a9d + e51d37c commit 8a00b73

2 files changed

Lines changed: 175 additions & 15 deletions

File tree

libraries/extensions/ros2-bridge/msg-gen/src/types/action.rs

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,20 @@ impl Action {
221221
let send_goal = format_ident!("send_goal__{package_name}__{}", self.name);
222222
let cxx_send_goal = "send_goal".to_string();
223223

224-
let matches = format_ident!("matches__{package_name}__{}", self.name);
225-
let cxx_matches = format_ident!("matches");
224+
let response_matches = format_ident!("matches__{package_name}__{}_response", self.name);
225+
let cxx_response_matches = format_ident!("matches_response");
226+
let feedback_matches = format_ident!("matches__{package_name}__{}_feedback", self.name);
227+
let cxx_feedback_matches = format_ident!("matches_feedback");
228+
let status_matches = format_ident!("matches__{package_name}__{}_status", self.name);
229+
let cxx_status_matches = format_ident!("matches_status");
230+
226231
let downcast = format_ident!("action_downcast__{package_name}__{}", self.name);
227232
let cxx_downcast = format_ident!("downcast");
228233

229234
let goal_type_raw = format_ident!("{package_name}__{}_Goal", self.name);
230235
let result_type_raw = format_ident!("{package_name}__{}_Result", self.name);
236+
let feedback_type_raw = format_ident!("{package_name}__{}_Feedback", self.name);
237+
let status_type_raw = format_ident!("action_msgs__GoalStatus");
231238

232239
let result_type_raw_str = result_type_raw.to_string();
233240

@@ -244,8 +251,16 @@ impl Action {
244251
fn #send_goal(self: &mut #client_name, request: #goal_type_raw) -> Result<()>;
245252

246253
#[namespace = #package_name]
247-
#[cxx_name = #cxx_matches]
248-
fn #matches(self: &mut #client_name, event: &CombinedEvent) -> bool;
254+
#[cxx_name = #cxx_response_matches]
255+
fn #response_matches(self: &mut #client_name, event: &CombinedEvent) -> bool;
256+
257+
#[namespace = #package_name]
258+
#[cxx_name = #cxx_feedback_matches]
259+
fn #feedback_matches(self: &mut #client_name, event: &CombinedEvent) -> bool;
260+
261+
#[namespace = #package_name]
262+
#[cxx_name = #cxx_status_matches]
263+
fn #status_matches(self: &mut #client_name, event: &CombinedEvent) -> bool;
249264

250265
#[namespace = #package_name]
251266
#[cxx_name = #cxx_downcast]
@@ -265,14 +280,26 @@ impl Action {
265280
qos.into(),
266281
).map_err(|e| eyre::eyre!("{e:?}"))?;
267282
let (response_tx, response_rx) = flume::bounded(1);
268-
let stream = response_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box<dyn std::any::Any + 'static>);
269-
let id = events.events.merge(Box::pin(stream));
283+
let response_stream = response_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box<dyn std::any::Any + 'static>);
284+
let response_id = events.events.merge(Box::pin(response_stream));
285+
286+
let (feedback_tx, feedback_rx) = flume::bounded(1);
287+
let feedback_stream = feedback_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box<dyn std::any::Any + 'static>);
288+
let feedback_id = events.events.merge(Box::pin(feedback_stream));
289+
290+
let (status_tx, status_rx) = flume::bounded(1);
291+
let status_stream = status_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box<dyn std::any::Any + 'static>);
292+
let status_id = events.events.merge(Box::pin(status_stream));
270293

271294
Ok(Box::new(#client_name {
272295
client: std::sync::Arc::new(client),
273296
response_tx: std::sync::Arc::new(response_tx),
297+
feedback_tx: std::sync::Arc::new(feedback_tx),
298+
status_tx: std::sync::Arc::new(status_tx),
274299
executor: self.executor.clone(),
275-
stream_id: id,
300+
response_id: response_id,
301+
feedback_id: feedback_id,
302+
status_id: status_id,
276303
}))
277304
}
278305
}
@@ -281,8 +308,12 @@ impl Action {
281308
pub struct #client_name {
282309
client: std::sync::Arc<ros2_client::action::ActionClient< #package :: action :: #self_name>>,
283310
response_tx: std::sync::Arc<flume::Sender<eyre::Result<ffi::#result_type_raw>>>,
311+
feedback_tx: std::sync::Arc<flume::Sender<eyre::Result<ffi::#feedback_type_raw>>>,
312+
status_tx: std::sync::Arc<flume::Sender<eyre::Result<ffi::#status_type_raw>>>,
284313
executor: std::sync::Arc<futures::executor::ThreadPool>,
285-
stream_id: u32,
314+
response_id: u32,
315+
feedback_id: u32,
316+
status_id: u32,
286317
}
287318

288319
impl #client_name {
@@ -314,12 +345,20 @@ impl Action {
314345

315346
let feedback_handle = {
316347
let client_ref = Arc::clone(&client_arc);
348+
let feedback_tx = self.feedback_tx.clone();
317349
async move {
318350
let feedback_stream = client_ref.feedback_stream(goal_id);
319351
feedback_stream.for_each(|feedback| async {
320352
match feedback {
321-
Ok(feedback) => println!("Received feedback: {:?}", feedback),
322-
Err(e) => eprintln!("Error while receive feedback: {:?}", e),
353+
Ok(feedback) => {
354+
let feedback = Ok(feedback);
355+
if feedback_tx.send_async(feedback).await.is_err() {
356+
tracing::warn!("failed to send action feedback");
357+
}
358+
}
359+
Err(e) => {
360+
tracing::error!("Failed to receive feedback for request {goal_id:?}: {:?}", e);
361+
}
323362
}
324363
}).await;
325364
}
@@ -330,12 +369,20 @@ impl Action {
330369

331370
let status_handle = {
332371
let client_ref = Arc::clone(&client_arc);
372+
let status_tx = self.status_tx.clone();
333373
async move {
334374
let status_stream = client_ref.status_stream(goal_id);
335375
status_stream.for_each(|status| async {
336376
match status {
337-
Ok(status) => println!("Status update: {:?}", status),
338-
Err(e) => eprintln!("Error receiving status update: {:?}", e),
377+
Ok(status) => {
378+
let status = Ok(status.into());
379+
if status_tx.send_async(status).await.is_err() {
380+
tracing::warn!("failed to send action status");
381+
}
382+
}
383+
Err(e) => {
384+
tracing::error!("Failed to receive status for request {goal_id:?}: {:?}", e);
385+
}
339386
}
340387
}).await;
341388
}
@@ -366,9 +413,25 @@ impl Action {
366413
}
367414

368415
#[allow(non_snake_case)]
369-
fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
416+
fn #response_matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
417+
match &event.event.as_ref().0 {
418+
Some(crate::MergedEvent::External(event)) if event.id == self.response_id => true,
419+
_ => false
420+
}
421+
}
422+
423+
#[allow(non_snake_case)]
424+
fn #feedback_matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
425+
match &event.event.as_ref().0 {
426+
Some(crate::MergedEvent::External(event)) if event.id == self.feedback_id => true,
427+
_ => false
428+
}
429+
}
430+
431+
#[allow(non_snake_case)]
432+
fn #status_matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
370433
match &event.event.as_ref().0 {
371-
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true,
434+
Some(crate::MergedEvent::External(event)) if event.id == self.status_id => true,
372435
_ => false
373436
}
374437
}
@@ -378,7 +441,7 @@ impl Action {
378441
use eyre::WrapErr;
379442

380443
match (*event.event).0 {
381-
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => {
444+
Some(crate::MergedEvent::External(event)) if event.id == self.response_id => {
382445
let result = event.event.downcast::<eyre::Result<ffi::#result_type_raw>>()
383446
.map_err(|_| eyre::eyre!("downcast to {} failed", #result_type_raw_str))?;
384447

libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,101 @@ impl Message {
178178
(quote! {}, quote! {})
179179
};
180180

181+
let msg_converter = match struct_raw_name.to_string().as_str() {
182+
"action_msgs__GoalStatus" => quote! {
183+
impl From<crate::ros2_client::action_msgs::GoalStatus> for ffi::action_msgs__GoalStatus {
184+
fn from(status: crate::ros2_client::action_msgs::GoalStatus) -> Self {
185+
use crate::ros2_client::action_msgs::GoalStatus;
186+
let GoalStatus {goal_info, status} = status;
187+
Self { goal_info: goal_info.into(), status: status as i8 }
188+
}
189+
}
190+
},
191+
"action_msgs__GoalInfo" => quote! {
192+
impl From<crate::ros2_client::action_msgs::GoalInfo> for ffi::action_msgs__GoalInfo {
193+
fn from(info: crate::ros2_client::action_msgs::GoalInfo) -> Self {
194+
use crate::ros2_client::action_msgs::GoalInfo;
195+
let GoalInfo {goal_id, stamp} = info;
196+
Self { goal_id: goal_id.into(), stamp: stamp.into()}
197+
}
198+
}
199+
},
200+
"unique_identifier_msgs__UUID" => quote! {
201+
impl From<crate::ros2_client::unique_identifier_msgs::UUID> for ffi::unique_identifier_msgs__UUID {
202+
fn from(uuid: crate::ros2_client::unique_identifier_msgs::UUID) -> Self {
203+
use crate::ros2_client::unique_identifier_msgs::UUID;
204+
let UUID {uuid} = uuid;
205+
let mut buf = [0u8; 16];
206+
uuid.as_simple().encode_lower(&mut buf);
207+
Self { uuid: buf }
208+
}
209+
}
210+
},
211+
"builtin_interfaces__Time" => quote! {
212+
impl From<crate::ros2_client::builtin_interfaces::Time> for ffi::builtin_interfaces__Time {
213+
fn from(time: crate::ros2_client::builtin_interfaces::Time) -> Self {
214+
let t = time.to_nanos();
215+
let quot = t / 1_000_000_000;
216+
let rem = t % 1_000_000_000;
217+
218+
// https://doc.rust-lang.org/reference/expressions/operator-expr.html#arithmetic-and-logical-binary-operators
219+
// "Rust uses a remainder defined with truncating division.
220+
// Given remainder = dividend % divisor,
221+
// the remainder will have the same sign as the dividend."
222+
223+
if rem >= 0 {
224+
// positive time, no surprise here
225+
// OR, negative time, but a whole number of seconds, fractional part is zero
226+
Self {
227+
// Saturate seconds to i32. This is different from C++ implementation
228+
// in rclcpp, which just uses
229+
// `ret.sec = static_cast<std::int32_t>(result.quot)`.
230+
sec: if quot > (i32::MAX as i64) {
231+
tracing::warn!("rcl_interfaces::Time conversion overflow");
232+
i32::MAX
233+
} else if quot < (i32::MIN as i64) {
234+
tracing::warn!("rcl_interfaces::Time conversion underflow");
235+
i32::MIN
236+
} else {
237+
quot as i32
238+
},
239+
nanosec: rem as u32,
240+
}
241+
} else {
242+
// Now `t` is negative AND `rem` is non-zero.
243+
// We do some non-obvious arithmetic:
244+
245+
// saturate whole seconds
246+
let quot_sat = if quot >= (i32::MIN as i64) {
247+
quot as i32
248+
} else {
249+
tracing::warn!("rcl_interfaces::Time conversion underflow");
250+
i32::MIN
251+
};
252+
253+
// Now, `rem` is between -999_999_999 and -1, inclusive.
254+
// Case rem = 0 is included in the positive branch.
255+
//
256+
// Adding 1_000_000_000 will make it positive, so cast to u32 is ok.
257+
//
258+
// It is also the right thing to do, because
259+
// * 0.0 sec = 0 sec and 0 nanosec
260+
// * -0.000_000_001 sec = -1 sec and 999_999_999 nanosec
261+
// * ...
262+
// * -0.99999999999 sec = -1 sec and 000_000_001 nanosec
263+
// * -1.0 sec = -1 sec and 0 nanosec
264+
// * -1.00000000001 sec = -2 sec and 999_999_999 nanosec
265+
Self {
266+
sec: quot_sat - 1, // note -1
267+
nanosec: (1_000_000_000 + rem) as u32,
268+
}
269+
}
270+
}
271+
}
272+
},
273+
_ => quote! {},
274+
};
275+
181276
let def = if self.members.is_empty() {
182277
quote! {
183278
#[allow(non_camel_case_types)]
@@ -221,6 +316,8 @@ impl Message {
221316

222317
}
223318

319+
#msg_converter
320+
224321
impl crate::_core::InternalDefault for ffi::#struct_raw_name {
225322
fn _default() -> Self {
226323
#default

0 commit comments

Comments
 (0)