ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
5

ROS2 : How to call a service from the callback function of a subscriber ?

asked 2018-08-30 04:38:04 -0600

Marc Testier gravatar image

updated 2018-08-30 04:49:45 -0600

Hello,

This question is actually related to : https://answers.ros.org/question/3018...

But I wrote a minimal example and wanted to have your feedback on how to fix this.

So I'm using ROS2 Bouncy, built from source on Ubuntu 18.04. I have a node that subscribe to a topic and in the callback function of the sub, I get stuck if I try to rclpy.spin_once(self).

I wrote a minimal package to show the problem, which can be found here : https://github.com/MarcTestier/test_r...

I was actually trying to call a service from the callback function but got stuck when doing rclpy.spin_until_future_complete(node, future) to get the answer form the service.

So any idea how to call a service from the callback function of a subscriber ?? Maybe threads ??

Thanks.

edit retag flag offensive close merge delete

4 Answers

Sort by ยป oldest newest most voted
2

answered 2018-08-30 21:12:20 -0600

Marc Testier gravatar image

Here is the solution proposed by @codebot to call a service from the callback of a sub : https://github.com/codebot/ros2_patte...

Here is what he said about the solution though :

The key issue is that calling a service synchronously from within a subscriber callback is usually not a good design, because it's possible that the inbound messages will be coming more quickly than the service can process them, which could result in an uncontrolled message queue forming. The general pattern is that topic callbacks should be "fast".

Anyway, if you still want to try :

To accomplish this in rclpy, we can asynchronously issue the service request. Then in the main spin() loop, we can look at the array of futures and, whenever one is completed, we can read its result.

So the way I threw that together will also result in an unbounded request queue if the inbound message stream is coming really fast and the service is slow but you could control that by looking at the length of self.client_futures and not adding to it if it's already larger than some threshold.

#!/usr/bin/env python3
import sys

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from test_msgs.srv import Empty

class SubscriberClientAsync(Node):

    def __init__(self):
        super().__init__('sub_client_node')
        self.client = self.create_client(Empty, 'service')
        self.sub = self.create_subscription(String, 'topic', self.topic_cb)
        self.client_futures = []

    def topic_cb(self, msg):
        print("sub_cb({})".format(msg.data))
        self.client_futures.append(self.client.call_async(Empty.Request()))

    def spin(self):
        while rclpy.ok():
            rclpy.spin_once(self)
            incomplete_futures = []
            for f in self.client_futures:
                if f.done():
                    res = f.result()
                    print("received service result: {}".format(res))
                else:
                    incomplete_futures.append(f)
            self.client_futures = incomplete_futures

if __name__ == '__main__':
    rclpy.init()
    node = SubscriberClientAsync()
node.spin()
edit flag offensive delete link more

Comments

2

@Marc Testier

The key issue is that calling a service synchronously from within a subscriber callback is usually not a good design

what should I do if I wanna call a service synchronously due to the msg from a subscriber callback? Can you give me an idea what I should look into?

lin404 gravatar image lin404  ( 2020-01-28 02:33:08 -0600 )edit

just in case if link mentioned in msg above doesn't works, here is the new one. (https://github.com/codebot/ros2_patte...)

aarsh_t gravatar image aarsh_t  ( 2021-10-29 02:02:09 -0600 )edit
2

answered 2020-05-18 05:56:35 -0600

Andreas Z gravatar image

The problem is, that you can't use the spin_node_until_future_complete() command inside the outer callback. The solution is to use an MultiThreadedExecutor and different callback groups. Then you can call the inner service by giving it a callback function (instead of waiting for the return code with spin_node_until_future_complete).

If you want to wait for the result from the inner service call before the outer service callback finishes you need to block the thread with a while(return_from_inner_service == false) loop. This is important if your outer callback is from a service, because then you want to receive the inner service result and use it to compute the result of the outer service. But this is bad design, because you block the spin() call from the outer service. Luckily we can still handle other callbacks with the MultiThreadedExecutor as long as we assign them to a different callback group. If the outer callback is from a topic you don't want to block the callback, just handle the inner service call inside the assigned callback function. It will execute asynchronous.

Here is some code example in C++ (assuming all necessary headers included and ROS services created)

Create the MultiThreadedExecuter and an instance of the Node in main:

int main(int argc, char *argv[])
{
    rclcpp::init(argc, argv);
    rclcpp::executors::MultiThreadedExecutor executor;
    auto node = std::make_shared<Communication>();
    executor.add_node(node);
    executor.spin();
    rclcpp::shutdown();
    return 0;
}

Create seperate Callback group and assign it to the inner service client (inside the Node/class constructor):

callback_group_input_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
get_input_client_ = this->create_client<petra_core::srv::GetInput>("GetInput", rmw_qos_profile_services_default, callback_group_input_);

Inside the outer callback from a topic or service, send the inner_request and define a callback for it:

int choice = -1;
    auto inner_client_callback = [&,this](rclcpp::Client<petra_core::srv::GetInput>::SharedFuture inner_future)
        { 
            auto result = inner_future.get();
            choice = stoi(result->input);
            RCLCPP_INFO(this->get_logger(), "[inner service] callback executed");
        };
    auto inner_future_result = get_input_client_->async_send_request(inner_request, inner_client_callback);

If it is necessary to wait inside the outer callback for the result block the thread:

// Wait for the result.
    while (choice < 0 && rclcpp::ok())
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

I hope it helps you, or whoever has the same problem. I was stuck with this for weeks...

edit flag offensive delete link more
1

answered 2023-04-13 14:43:49 -0600

ld_returned_1 gravatar image

I was stuck on this for whole day, so I'm adding my solution in case anyone else runs into a similar problem. The previously mentioned solutions are on the right track, but they are incomplete. There is an official solution in the ros docs, which I am pasting here just in case the link breaks.

Note that this problem is not unique to calling a service client from within a subscriber callback. This problem occurs whenever you have to call a callback from within a callback. Therefore the example of calling a service from within the timer callback is effectively the same as calling a service from within a subscription callback, or calling a service from within a service callback.

The solution is to put your nested callbacks in different callback groups (see the link above) and to use std::future::wait_for() instead of rclcpp::spin_until_future_complete().

#include <chrono>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/empty.hpp"

using namespace std::chrono_literals;

namespace cb_group_demo
{
class DemoNode : public rclcpp::Node
{
public:
    DemoNode() : Node("client_node")
    {
        client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
        timer_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
        client_ptr_ = this->create_client<std_srvs::srv::Empty>("test_service", rmw_qos_profile_services_default,
                                                                client_cb_group_);
        timer_ptr_ = this->create_wall_timer(1s, std::bind(&DemoNode::timer_callback, this),
                                            timer_cb_group_);
    }

private:
    rclcpp::CallbackGroup::SharedPtr client_cb_group_;
    rclcpp::CallbackGroup::SharedPtr timer_cb_group_;
    rclcpp::Client<std_srvs::srv::Empty>::SharedPtr client_ptr_;
    rclcpp::TimerBase::SharedPtr timer_ptr_;

    void timer_callback()
    {
        RCLCPP_INFO(this->get_logger(), "Sending request");
        auto request = std::make_shared<std_srvs::srv::Empty::Request>();
        auto result_future = client_ptr_->async_send_request(request);

        // Do this instead of rclcpp::spin_until_future_complete()
        std::future_status status = result_future.wait_for(10s);  // timeout to guarantee a graceful finish
        if (status == std::future_status::ready) {
            RCLCPP_INFO(this->get_logger(), "Received response");
            std::shared_ptr<ServiceT::Response> response = result_future.get();
            // Do something with response 
        }
    }
};  // class DemoNode
}   // namespace cb_group_demo

int main(int argc, char* argv[])
{
    rclcpp::init(argc, argv);
    auto client_node = std::make_shared<cb_group_demo::DemoNode>();
    rclcpp::executors::MultiThreadedExecutor executor;
    executor.add_node(client_node);

    RCLCPP_INFO(client_node->get_logger(), "Starting client node, shut down with CTRL-C");
    executor.spin();
    RCLCPP_INFO(client_node->get_logger(), "Keyboard interrupt, shutting down.\n");

    rclcpp::shutdown();
    return 0;
}
edit flag offensive delete link more

Comments

Do you know how to perform or whether it is possible such a service call, when a node is configured as a component and loaded as a composable node?

aserbremen gravatar image aserbremen  ( 2023-06-07 02:19:53 -0600 )edit
0

answered 2019-12-13 13:16:23 -0600

fmrico gravatar image

Dear ROS developers,

If you want a solution in C++, check out THIS. Pieces of code for illustrating the solution:

struct ActivationFuture
{
  std::shared_future<std::shared_ptr<bica_msgs::srv::ActivateComponent_Response_<std::allocator<void>>>> future;
  std::string component;
};

class MyNode : public rclcpp::Node
{
  rclcpp::Service<bica_msgs::srv::ActivateComponent>::SharedPtr activation_service_; 
  std::vector<ActivationFuture> pending_act_futures_;

  void
  MyNode::activateDependency(const std::string & dep)
  {
    auto client = this->create_client<bica_msgs::srv::ActivateComponent>("/" + dep + "/activate");

    if (!client->wait_for_service(100ms)) {
      RCLCPP_ERROR(get_logger(), "Service %s is not available.",
        client->get_service_name());
      return;
    }

    auto request = std::make_shared<bica_msgs::srv::ActivateComponent::Request>();
    request->activator = get_name();

    auto future = client->async_send_request(request);
    pending_act_futures_.push_back(ActivationFuture{future, dep});
  }

  bool
  MyNode::ok()
  {
    if (rclcpp::ok()) {
      while(!pending_act_futures_.empty()) {
        auto pending_future = pending_act_futures_.back();
        if ((rclcpp::spin_until_future_complete(this->get_node_base_interface(), pending_future.future)) !=
          rclcpp::executor::FutureReturnCode::SUCCESS)
        {
          RCLCPP_WARN(get_logger(), "Component [%s] failed to activate", pending_future.component.c_str());
        }
        pending_act_futures_.pop_back();
      }
    }

    return rclcpp::ok();
  }

  void
  MyNode::execute()
  {
    while (this->ok()) {
      rclcpp::spin_some(this->get_node_base_interface());
      rate_.sleep();
    }
}

The bad thing is that you need a different vector for each type of future. Any ideas?

I hope it helps!!!

edit flag offensive delete link more

Question Tools

1 follower

Stats

Asked: 2018-08-30 04:38:04 -0600

Seen: 12,954 times

Last updated: May 18 '20