ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

My answer may be broader than what OP is asking, but because the initial motivation was the same for me to start tackling (static_transform not looked up via Python api in ROS2 for me), I share here what I found. It's embarrassing to admit that simple mistake.

Simply, I just had to run lookup AFTER spin at least once.

The following code is verified to lookup with turtle_tf2_py tutorial along with online tutorial (docs.ros.org/en/galactic/Tutorials/Intermediate/Tf2).

Btw, I still haven't had "try2" approach working (the one @AndyZe suggested). I doubt there _may_ be some issues as I see rclpy#989? No idea.

#!/usr/bin/python3

import asyncio
import time
import yaml

import rclpy
import tf2_ros
from tf2_ros import TransformException

class TfFramesFinder(rclpy.node.Node):
    def __init__(self):
        super().__init__("tf2_frames_finder")
        self._tf_buffer = tf2_ros.buffer.Buffer()
        self._tf_listener = tf2_ros.transform_listener.TransformListener(self._tf_buffer, self, spin_thread = False)

    def try3(self, target_f="kts2_table_frame", source_f="world"):
        self.get_logger().info("Getting it3!")
        _MAX_ATTEMPTS = 15
        _attempts = 0
        _transform_res = None
        while _attempts < _MAX_ATTEMPTS:
        _time_snapshot = rclpy.time.Time()
            try:
                self.get_logger().info("Looking up {}th time".format(_attempts))

                _transform_res = self._tf_buffer.lookup_transform(
                    target_f,
                    source_f,
                    _time_snapshot)
                break
            except TransformException as ex:
                if _MAX_ATTEMPTS < _attempts:
                    raise RuntimeError(
                        f"Could not transform '{target_f}' to '{source_f}' at the time '{_time_snapshot}'.: {ex}")
                else:
                    self.get_logger().warn("Failed {} times".format(_attempts))
                    time.sleep(1)
            _attempts += 1
        self.get_logger().info("x: {}, y: {}".format(
            _transform_res.transform.translation.x, _transform_res.transform.translation.y))

    def try2(self, target_f="kts2_table_frame", source_f="world"):
        self.get_logger().info("Getting it!")
        tf_future = self._tf_buffer.wait_for_transform_async(
            target_frame=target_f,
            source_frame=source_f,
            time=rclpy.time.Time())
        self.get_logger().info("wait_for_transform_async returned. tf_future: {}".format(tf_future))
        rclpy.spin_until_future_complete(self, tf_future)
        self.get_logger().info("spin_until_future_complete returned.")
        world_to_reference_transform = asyncio.run(self._tf_buffer.lookup_transform_async(
            target_f, source_f, rclpy.time.Time()))

rclpy.init()
node = TfFramesFinder()
#frame_parent = "turtle2"
frame_parent = "world"
#frame_child = "turtle2"
frame_child = "static_3"
try:
    rclpy.spin_once(node)
    node.try3(
        target_f=frame_parent,
        source_f=frame_child)
except KeyboardInterrupt as e:
    print("k/b interrupted")

node.destroy_node()
rclpy.shutdown()

My answer may be broader than what OP is asking, but because the initial motivation was the same for me to start tackling (static_transform not looked up via Python api in ROS2 for me), I share here what I found. It's embarrassing to admit that simple mistake.

Simply, I just had to run lookup AFTER spin at least once.

The following code is verified to lookup with turtle_tf2_py tutorial along with online tutorial (docs.ros.org/en/galactic/Tutorials/Intermediate/Tf2).. Worked with static_transform too.

Btw, I still haven't had "try2" approach working (the one @AndyZe suggested). I doubt there _may_ be some issues as I see rclpy#989? No idea.

#!/usr/bin/python3

import asyncio
import time
import yaml

import rclpy
import tf2_ros
from tf2_ros import TransformException

class TfFramesFinder(rclpy.node.Node):
    def __init__(self):
        super().__init__("tf2_frames_finder")
        self._tf_buffer = tf2_ros.buffer.Buffer()
        self._tf_listener = tf2_ros.transform_listener.TransformListener(self._tf_buffer, self, spin_thread = False)

    def try3(self, target_f="kts2_table_frame", source_f="world"):
        self.get_logger().info("Getting it3!")
        _MAX_ATTEMPTS = 15
        _attempts = 0
        _transform_res = None
        while _attempts < _MAX_ATTEMPTS:
        _time_snapshot = rclpy.time.Time()
            try:
                self.get_logger().info("Looking up {}th time".format(_attempts))

                _transform_res = self._tf_buffer.lookup_transform(
                    target_f,
                    source_f,
                    _time_snapshot)
                break
            except TransformException as ex:
                if _MAX_ATTEMPTS < _attempts:
                    raise RuntimeError(
                        f"Could not transform '{target_f}' to '{source_f}' at the time '{_time_snapshot}'.: {ex}")
                else:
                    self.get_logger().warn("Failed {} times".format(_attempts))
                    time.sleep(1)
            _attempts += 1
        self.get_logger().info("x: {}, y: {}".format(
            _transform_res.transform.translation.x, _transform_res.transform.translation.y))

    def try2(self, target_f="kts2_table_frame", source_f="world"):
        self.get_logger().info("Getting it!")
        tf_future = self._tf_buffer.wait_for_transform_async(
            target_frame=target_f,
            source_frame=source_f,
            time=rclpy.time.Time())
        self.get_logger().info("wait_for_transform_async returned. tf_future: {}".format(tf_future))
        rclpy.spin_until_future_complete(self, tf_future)
        self.get_logger().info("spin_until_future_complete returned.")
        world_to_reference_transform = asyncio.run(self._tf_buffer.lookup_transform_async(
            target_f, source_f, rclpy.time.Time()))

rclpy.init()
node = TfFramesFinder()
#frame_parent = "turtle2"
frame_parent = "world"
#frame_child = "turtle2"
frame_child = "static_3"
try:
    rclpy.spin_once(node)
    node.try3(
        target_f=frame_parent,
        source_f=frame_child)
except KeyboardInterrupt as e:
    print("k/b interrupted")

node.destroy_node()
rclpy.shutdown()