diff --git a/image_transport/CMakeLists.txt b/image_transport/CMakeLists.txt index 089d5a12..61be914a 100644 --- a/image_transport/CMakeLists.txt +++ b/image_transport/CMakeLists.txt @@ -23,7 +23,7 @@ find_package(sensor_msgs REQUIRED) # Build image_transport library add_library(${PROJECT_NAME} src/camera_common.cpp - src/publisher.cpp + src/plugin_publisher.cpp src/subscriber.cpp src/single_subscriber_publisher.cpp src/camera_publisher.cpp diff --git a/image_transport/default_plugins.xml b/image_transport/default_plugins.xml index 70c86767..b0bb67e6 100644 --- a/image_transport/default_plugins.xml +++ b/image_transport/default_plugins.xml @@ -1,13 +1,4 @@ - - - This is the default publisher. It publishes the Image as-is on the base topic. - - - #include +#include #include #include +#include "loader_fwds.hpp" #include "rclcpp/node.hpp" #include "image_transport/camera_publisher.hpp" @@ -46,6 +48,11 @@ namespace image_transport { +using Publisher = PublisherBase; + +PubLoaderPtr getPubLoader(); +SubLoaderPtr getSubLoader(); + /*! * \brief Advertise an image topic, free function version. */ @@ -56,6 +63,21 @@ Publisher create_publisher( rmw_qos_profile_t custom_qos = rmw_qos_profile_default, rclcpp::PublisherOptions options = rclcpp::PublisherOptions()); +template< + typename MessageT, + typename AllocatorT = std::allocator, + typename PublisherT = PublisherBase> +std::shared_ptr +IMAGE_TRANSPORT_PUBLIC +create_type_adapted_publisher( + rclcpp::Node * node, + const std::string & base_topic, + rmw_qos_profile_t custom_qos = rmw_qos_profile_default, + rclcpp::PublisherOptions options = rclcpp::PublisherOptions()) +{ + return std::make_shared(node, base_topic, getPubLoader(), custom_qos, options); +} + /** * \brief Subscribe to an image topic, free function version. */ @@ -68,6 +90,21 @@ Subscriber create_subscription( rmw_qos_profile_t custom_qos = rmw_qos_profile_default, rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()); +template +typename rclcpp::Subscription::SharedPtr create_type_adapted_subscription( + rclcpp::Node * node, + const std::string & base_topic, + CallbackT && callback, + rmw_qos_profile_t custom_qos = rmw_qos_profile_default, + rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()) +{ + auto qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(custom_qos), custom_qos); + return node->create_subscription( + base_topic, qos, + std::forward(callback), + options); +} + /*! * \brief Advertise a camera, free function version. */ diff --git a/image_transport/include/image_transport/plugin_publisher.hpp b/image_transport/include/image_transport/plugin_publisher.hpp new file mode 100644 index 00000000..4475e6e7 --- /dev/null +++ b/image_transport/include/image_transport/plugin_publisher.hpp @@ -0,0 +1,125 @@ +// Copyright (c) 2009, Willow Garage, Inc. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the Willow Garage nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef IMAGE_TRANSPORT__PLUGIN_PUBLISHER_HPP_ +#define IMAGE_TRANSPORT__PLUGIN_PUBLISHER_HPP_ + +#include +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" + +#include "sensor_msgs/msg/image.hpp" + +#include "image_transport/exception.hpp" +#include "image_transport/loader_fwds.hpp" +#include "image_transport/single_subscriber_publisher.hpp" +#include "image_transport/visibility_control.hpp" + +namespace image_transport +{ + +/** + * \brief Manages advertisements of multiple transport options on an Image topic. + * + * Publisher is a drop-in replacement for ros::Publisher when publishing + * Image topics. In a minimally built environment, they behave the same; however, + * Publisher is extensible via plugins to publish alternative representations of + * the image on related subtopics. This is especially useful for limiting bandwidth and + * latency over a network connection, when you might (for example) use the theora plugin + * to transport the images as streamed video. All topics are published only on demand + * (i.e. if there are subscribers). + * + * A Publisher should always be created through a call to ImageTransport::advertise(), + * or copied from one that was. + * Once all copies of a specific Publisher go out of scope, any subscriber callbacks + * associated with that handle will stop being called. Once all Publisher for a + * given base topic go out of scope the topic (and all subtopics) will be unadvertised. + */ +class PluginPublisher +{ +public: + IMAGE_TRANSPORT_PUBLIC + PluginPublisher() = default; + + IMAGE_TRANSPORT_PUBLIC + PluginPublisher( + rclcpp::Node * nh, + const std::string & image_topic, + PubLoaderPtr loader, + rmw_qos_profile_t custom_qos, + rclcpp::PublisherOptions options = rclcpp::PublisherOptions()); + + /*! + * \brief Returns the number of subscribers that are currently connected to + * this Publisher. + * + * Returns the total number of subscribers to all advertised topics. + */ + IMAGE_TRANSPORT_PUBLIC + size_t getNumSubscribers() const; + + /*! + * \brief Publish an image on the topics associated with this Publisher. + */ + IMAGE_TRANSPORT_PUBLIC + void publish(const sensor_msgs::msg::Image & message) const; + + /*! + * \brief Publish an image on the topics associated with this Publisher. + */ + IMAGE_TRANSPORT_PUBLIC + void publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const; + + /*! + * \brief Shutdown the advertisements associated with this Publisher. + */ + IMAGE_TRANSPORT_PUBLIC + void shutdown(); + + IMAGE_TRANSPORT_PUBLIC + operator void *() const; + + IMAGE_TRANSPORT_PUBLIC + bool operator<(const PluginPublisher & rhs) const {return impl_ < rhs.impl_;} + + IMAGE_TRANSPORT_PUBLIC + bool operator!=(const PluginPublisher & rhs) const {return impl_ != rhs.impl_;} + + IMAGE_TRANSPORT_PUBLIC + bool operator==(const PluginPublisher & rhs) const {return impl_ == rhs.impl_;} + +private: + struct Impl; + std::shared_ptr impl_; +}; + +} // namespace image_transport + +#endif // IMAGE_TRANSPORT__PLUGIN_PUBLISHER_HPP_ diff --git a/image_transport/include/image_transport/publisher.hpp b/image_transport/include/image_transport/publisher.hpp index dc28d017..60f2105a 100644 --- a/image_transport/include/image_transport/publisher.hpp +++ b/image_transport/include/image_transport/publisher.hpp @@ -29,8 +29,9 @@ #ifndef IMAGE_TRANSPORT__PUBLISHER_HPP_ #define IMAGE_TRANSPORT__PUBLISHER_HPP_ -#include #include +#include +#include #include "rclcpp/macros.hpp" #include "rclcpp/node.hpp" @@ -41,6 +42,7 @@ #include "image_transport/loader_fwds.hpp" #include "image_transport/single_subscriber_publisher.hpp" #include "image_transport/visibility_control.hpp" +#include "image_transport/plugin_publisher.hpp" namespace image_transport { @@ -62,19 +64,47 @@ namespace image_transport * associated with that handle will stop being called. Once all Publisher for a * given base topic go out of scope the topic (and all subtopics) will be unadvertised. */ -class Publisher +template> +class PublisherBase { public: + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + using PublishedTypeAllocatorTraits = rclcpp::allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = rclcpp::allocator::Deleter; + + using ROSMessageTypeAllocatorTraits = rclcpp::allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = rclcpp::allocator::Deleter; + + static_assert( + std::is_same_v, + "Ros Message Type must be sensor_msgs::msg::Image"); + + RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase) + IMAGE_TRANSPORT_PUBLIC - Publisher() = default; + PublisherBase() = default; IMAGE_TRANSPORT_PUBLIC - Publisher( + PublisherBase( rclcpp::Node * nh, const std::string & base_topic, PubLoaderPtr loader, rmw_qos_profile_t custom_qos, - rclcpp::PublisherOptions options = rclcpp::PublisherOptions()); + rclcpp::PublisherOptions options = rclcpp::PublisherOptions()) + // Resolve the name explicitly because otherwise the compressed topics don't remap + // properly (#3652). + : image_topic_(rclcpp::expand_topic_or_service_name(base_topic, nh->get_name(), + nh->get_namespace())) + , plugin_publisher(nh, image_topic_, std::move(loader), custom_qos, options) + { + RCLCPP_DEBUG(nh->get_logger(), "getTopicToAdvertise: %s", image_topic_.c_str()); + auto qos = rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(custom_qos), custom_qos); + raw_publisher = nh->create_publisher(image_topic_, qos, options); + } /*! * \brief Returns the number of subscribers that are currently connected to @@ -83,47 +113,135 @@ class Publisher * Returns the total number of subscribers to all advertised topics. */ IMAGE_TRANSPORT_PUBLIC - size_t getNumSubscribers() const; + size_t getNumSubscribers() const + { + return raw_publisher->get_subscription_count() + plugin_publisher.getNumSubscribers(); + } /*! * \brief Returns the base topic of this Publisher. */ IMAGE_TRANSPORT_PUBLIC - std::string getTopic() const; + std::string getTopic() const + { + return image_topic_; + } /*! * \brief Publish an image on the topics associated with this Publisher. */ + template + typename std::enable_if_t< + std::is_same::value + > IMAGE_TRANSPORT_PUBLIC - void publish(const sensor_msgs::msg::Image & message) const; + publish(const sensor_msgs::msg::Image & message) const + { + plugin_publisher.publish(message); + publish_raw(message); + } /*! * \brief Publish an image on the topics associated with this Publisher. */ + template + typename std::enable_if_t< + std::is_same::value + > IMAGE_TRANSPORT_PUBLIC - void publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const; + publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const + { + plugin_publisher.publish(message); + publish_raw(*message); + } + + /*! + * \brief Publish a type adapted image on the topics associated with this Publisher. + */ + template + typename std::enable_if_t::is_specialized::value && + std::is_same::value> + IMAGE_TRANSPORT_PUBLIC publish(std::unique_ptr msg) const + { + if (HasPluginSubscribers()) { + auto shared_msg = std::make_shared(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *shared_msg); + + plugin_publisher.publish(shared_msg); + } + + publish_raw(std::move(msg)); + } + + template + typename std::enable_if_t::is_specialized::value && + std::is_same::value> + IMAGE_TRANSPORT_PUBLIC publish(const T & msg) const + { + if (HasPluginSubscribers()) { + auto shared_msg = std::make_shared(); + rclcpp::TypeAdapter::convert_to_ros_message(msg, *shared_msg); + plugin_publisher.publish(shared_msg); + RCLCPP_WARN(rclcpp::get_logger("freform"), "Publishing to plugin publisher"); + } + + publish_raw(msg); + } /*! * \brief Shutdown the advertisements associated with this Publisher. */ IMAGE_TRANSPORT_PUBLIC - void shutdown(); + void shutdown() + { + plugin_publisher.shutdown(); + raw_publisher.reset(); + } IMAGE_TRANSPORT_PUBLIC operator void *() const; IMAGE_TRANSPORT_PUBLIC - bool operator<(const Publisher & rhs) const {return impl_ < rhs.impl_;} + bool operator<(const PublisherBase & rhs) const {return plugin_publisher < rhs.plugin_publisher;} IMAGE_TRANSPORT_PUBLIC - bool operator!=(const Publisher & rhs) const {return impl_ != rhs.impl_;} + bool operator!=(const PublisherBase & rhs) const + { + return plugin_publisher != rhs.plugin_publisher; + } IMAGE_TRANSPORT_PUBLIC - bool operator==(const Publisher & rhs) const {return impl_ == rhs.impl_;} + bool operator==(const PublisherBase & rhs) const + { + return plugin_publisher == rhs.plugin_publisher; + } private: - struct Impl; - std::shared_ptr impl_; + template + void publish_raw(T && message) const + { + if (raw_publisher == nullptr) { + // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged + RCLCPP_FATAL( + rclcpp::get_logger( + "image_transport"), "Call to publish() on an invalid image_transport::Publisher"); + + return; + } + + if (raw_publisher->get_subscription_count() > 0) { + raw_publisher->publish(std::forward(message)); + } + } + + bool HasPluginSubscribers() const + { + return plugin_publisher.getNumSubscribers() > 0; + } + + std::string image_topic_; + PluginPublisher plugin_publisher; + typename rclcpp::Publisher::SharedPtr raw_publisher; }; } // namespace image_transport diff --git a/image_transport/include/image_transport/raw_publisher.hpp b/image_transport/include/image_transport/raw_publisher.hpp deleted file mode 100644 index e52b26b9..00000000 --- a/image_transport/include/image_transport/raw_publisher.hpp +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2009, Willow Garage, Inc. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// * Neither the name of the Willow Garage nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. - -#ifndef IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_ -#define IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_ - -#include - -#include "sensor_msgs/msg/image.hpp" - -#include "image_transport/simple_publisher_plugin.hpp" -#include "image_transport/visibility_control.hpp" - -namespace image_transport -{ - -/** - * \brief The default PublisherPlugin. - * - * RawPublisher is a simple wrapper for ros::Publisher, publishing unaltered Image - * messages on the base topic. - */ - -class RawPublisher : public SimplePublisherPlugin -{ -public: - virtual ~RawPublisher() {} - - virtual std::string getTransportName() const - { - return "raw"; - } - -protected: - virtual void publish(const sensor_msgs::msg::Image & message, const PublishFn & publish_fn) const - { - publish_fn(message); - } - - virtual std::string getTopicToAdvertise(const std::string & base_topic) const - { - return base_topic; - } -}; - -} // namespace image_transport - -#endif // IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_ diff --git a/image_transport/include/image_transport/single_subscriber_publisher.hpp b/image_transport/include/image_transport/single_subscriber_publisher.hpp index e6347db8..f5975178 100644 --- a/image_transport/include/image_transport/single_subscriber_publisher.hpp +++ b/image_transport/include/image_transport/single_subscriber_publisher.hpp @@ -81,7 +81,8 @@ class SingleSubscriberPublisher GetNumSubscribersFn num_subscribers_fn_; PublishFn publish_fn_; - friend class Publisher; // to get publish_fn_ directly + template + friend class PublisherBase; // to get publish_fn_ directly }; typedef std::function SubscriberStatusCallback; diff --git a/image_transport/src/image_transport.cpp b/image_transport/src/image_transport.cpp index dfddcb91..d4262b52 100644 --- a/image_transport/src/image_transport.cpp +++ b/image_transport/src/image_transport.cpp @@ -56,13 +56,23 @@ struct Impl static Impl * kImpl = new Impl(); +PubLoaderPtr getPubLoader() +{ + return kImpl->pub_loader_; +} + +SubLoaderPtr getSubLoader() +{ + return kImpl->sub_loader_; +} + Publisher create_publisher( rclcpp::Node * node, const std::string & base_topic, rmw_qos_profile_t custom_qos, rclcpp::PublisherOptions options) { - return Publisher(node, base_topic, kImpl->pub_loader_, custom_qos, options); + return {node, base_topic, kImpl->pub_loader_, custom_qos, options}; } Subscriber create_subscription( diff --git a/image_transport/src/manifest.cpp b/image_transport/src/manifest.cpp index 0d1d78d6..bcfaeca6 100644 --- a/image_transport/src/manifest.cpp +++ b/image_transport/src/manifest.cpp @@ -28,8 +28,6 @@ #include -#include "image_transport/raw_publisher.hpp" #include "image_transport/raw_subscriber.hpp" -PLUGINLIB_EXPORT_CLASS(image_transport::RawPublisher, image_transport::PublisherPlugin) PLUGINLIB_EXPORT_CLASS(image_transport::RawSubscriber, image_transport::SubscriberPlugin) diff --git a/image_transport/src/publisher.cpp b/image_transport/src/plugin_publisher.cpp similarity index 84% rename from image_transport/src/publisher.cpp rename to image_transport/src/plugin_publisher.cpp index 0bed7b09..38a3a6f3 100644 --- a/image_transport/src/publisher.cpp +++ b/image_transport/src/plugin_publisher.cpp @@ -26,7 +26,7 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "image_transport/publisher.hpp" +#include "image_transport/plugin_publisher.hpp" #include #include @@ -46,7 +46,7 @@ namespace image_transport { -struct Publisher::Impl +struct PluginPublisher::Impl { explicit Impl(rclcpp::Node * node) : logger_(node->get_logger()), @@ -68,11 +68,6 @@ struct Publisher::Impl return count; } - std::string getTopic() const - { - return base_topic_; - } - bool isValid() const { return !unadvertised_; @@ -96,18 +91,12 @@ struct Publisher::Impl bool unadvertised_; }; -Publisher::Publisher( - rclcpp::Node * node, const std::string & base_topic, +PluginPublisher::PluginPublisher( + rclcpp::Node * node, const std::string & image_topic, PubLoaderPtr loader, rmw_qos_profile_t custom_qos, rclcpp::PublisherOptions options) : impl_(std::make_shared(node)) { - // Resolve the name explicitly because otherwise the compressed topics don't remap - // properly (#3652). - std::string image_topic = rclcpp::expand_topic_or_service_name( - base_topic, - node->get_name(), node->get_namespace()); - impl_->base_topic_ = image_topic; impl_->loader_ = loader; auto ns_len = node->get_effective_namespace().length(); @@ -150,27 +139,15 @@ Publisher::Publisher( lookup_name.c_str(), e.what()); } } - - if (impl_->publishers_.empty()) { - throw Exception( - "No plugins found! Does `rospack plugins --attrib=plugin " - "image_transport` find any packages?"); - } } -size_t Publisher::getNumSubscribers() const +size_t PluginPublisher::getNumSubscribers() const { if (impl_ && impl_->isValid()) {return impl_->getNumSubscribers();} return 0; } -std::string Publisher::getTopic() const -{ - if (impl_) {return impl_->getTopic();} - return std::string(); -} - -void Publisher::publish(const sensor_msgs::msg::Image & message) const +void PluginPublisher::publish(const sensor_msgs::msg::Image & message) const { if (!impl_ || !impl_->isValid()) { // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged @@ -186,7 +163,7 @@ void Publisher::publish(const sensor_msgs::msg::Image & message) const } } -void Publisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const +void PluginPublisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const { if (!impl_ || !impl_->isValid()) { // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged @@ -202,7 +179,7 @@ void Publisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) } } -void Publisher::shutdown() +void PluginPublisher::shutdown() { if (impl_) { impl_->shutdown(); @@ -210,7 +187,7 @@ void Publisher::shutdown() } } -Publisher::operator void *() const +PluginPublisher::operator void *() const { return (impl_ && impl_->isValid()) ? reinterpret_cast(1) : reinterpret_cast(0); } diff --git a/image_transport/test/test_message_passing.cpp b/image_transport/test/test_message_passing.cpp index 0a4372c3..6db80873 100644 --- a/image_transport/test/test_message_passing.cpp +++ b/image_transport/test/test_message_passing.cpp @@ -148,6 +148,122 @@ TEST_F(MessagePassingTesting, one_camera_message_passing) ASSERT_EQ(1, total_images_received); } +template<> +struct rclcpp::TypeAdapter< + std::string, + sensor_msgs::msg::Image +> +{ + using is_specialized = std::true_type; + using custom_type = std::string; + using ros_message_type = sensor_msgs::msg::Image; + + static + void + convert_to_ros_message( + const custom_type & source, + ros_message_type & destination) + { + destination.encoding = source; + } + + static + void + convert_to_custom( + const ros_message_type & source, + custom_type & destination) + { + destination = source.encoding; + } +}; + +using AdaptedType = rclcpp::TypeAdapter; + +TEST_F(MessagePassingTesting, type_adapted_message_passing) +{ + const size_t max_retries = 3; + const size_t max_loops = 200; + const std::chrono::milliseconds sleep_per_loop = std::chrono::milliseconds(10); + + rclcpp::executors::SingleThreadedExecutor executor; + total_images_received = 0; + + auto pub = + image_transport::create_type_adapted_publisher(node_.get(), "camera/image"); + auto sub = + image_transport::create_subscription(node_.get(), "camera/image", imageCallback, "raw"); + + test_rclcpp::wait_for_subscriber(node_, sub.getTopic()); + + ASSERT_EQ(0, total_images_received); + ASSERT_EQ(1u, pub->getNumSubscribers()); + ASSERT_EQ(1u, sub.getNumPublishers()); + + executor.spin_node_some(node_); + ASSERT_EQ(0, total_images_received); + + size_t retry = 0; + while (retry < max_retries && total_images_received == 0) { + // generate random image and publish it + pub->publish(std::make_unique("random_image")); + + executor.spin_node_some(node_); + size_t loop = 0; + while ((total_images_received != 1) && (loop++ < max_loops)) { + std::this_thread::sleep_for(sleep_per_loop); + executor.spin_node_some(node_); + } + } + + ASSERT_EQ(1, total_images_received); +} + +void typeAdaptedCallback(std::unique_ptr msg) +{ + (void) msg; + total_images_received++; +} + +TEST_F(MessagePassingTesting, type_adapted_message_subscription) +{ + const size_t max_retries = 3; + const size_t max_loops = 200; + const std::chrono::milliseconds sleep_per_loop = std::chrono::milliseconds(10); + + rclcpp::executors::SingleThreadedExecutor executor; + total_images_received = 0; + + auto pub = + image_transport::create_type_adapted_publisher(node_.get(), "camera/image"); + auto sub = + image_transport::create_type_adapted_subscription( + node_.get(), "camera/image", typeAdaptedCallback); + + test_rclcpp::wait_for_subscriber(node_, sub->get_topic_name()); + + ASSERT_EQ(0, total_images_received); + ASSERT_EQ(1u, pub->getNumSubscribers()); + ASSERT_EQ(1u, sub->get_publisher_count()); + + executor.spin_node_some(node_); + ASSERT_EQ(0, total_images_received); + + size_t retry = 0; + while (retry < max_retries && total_images_received == 0) { + // generate random image and publish it + pub->publish(std::make_unique("random_image")); + + executor.spin_node_some(node_); + size_t loop = 0; + while ((total_images_received != 1) && (loop++ < max_loops)) { + std::this_thread::sleep_for(sleep_per_loop); + executor.spin_node_some(node_); + } + } + + ASSERT_EQ(1, total_images_received); +} + /* TEST_F(MessagePassingTesting, stress_message_passing) {