My answer may be broader than what OP is asking, but because the initial motivation was the same for me to start tackling (static_transform not looked up via Python api in ROS2 for me), I share here what I found. It's embarrassing to admit that simple mistake.
Simply, I just had to run lookup AFTER spin at least once.
The following code is verified to lookup with turtle_tf2_py tutorial along with online tutorial (docs.ros.org/en/galactic/Tutorials/Intermediate/Tf2). Worked with static_transform
too.
Btw, I still haven't had "try2" approach working (the one @AndyZe suggested). I doubt there _may_ be some issues as I see rclpy#989? No idea.
#!/usr/bin/python3
import asyncio
import time
import yaml
import rclpy
import tf2_ros
from tf2_ros import TransformException
class TfFramesFinder(rclpy.node.Node):
def __init__(self):
super().__init__("tf2_frames_finder")
self._tf_buffer = tf2_ros.buffer.Buffer()
self._tf_listener = tf2_ros.transform_listener.TransformListener(self._tf_buffer, self, spin_thread = False)
def try3(self, target_f="kts2_table_frame", source_f="world"):
self.get_logger().info("Getting it3!")
_MAX_ATTEMPTS = 15
_attempts = 0
_transform_res = None
while _attempts < _MAX_ATTEMPTS:
_time_snapshot = rclpy.time.Time()
try:
self.get_logger().info("Looking up {}th time".format(_attempts))
_transform_res = self._tf_buffer.lookup_transform(
target_f,
source_f,
_time_snapshot)
break
except TransformException as ex:
if _MAX_ATTEMPTS < _attempts:
raise RuntimeError(
f"Could not transform '{target_f}' to '{source_f}' at the time '{_time_snapshot}'.: {ex}")
else:
self.get_logger().warn("Failed {} times".format(_attempts))
time.sleep(1)
_attempts += 1
self.get_logger().info("x: {}, y: {}".format(
_transform_res.transform.translation.x, _transform_res.transform.translation.y))
def try2(self, target_f="kts2_table_frame", source_f="world"):
self.get_logger().info("Getting it!")
tf_future = self._tf_buffer.wait_for_transform_async(
target_frame=target_f,
source_frame=source_f,
time=rclpy.time.Time())
self.get_logger().info("wait_for_transform_async returned. tf_future: {}".format(tf_future))
rclpy.spin_until_future_complete(self, tf_future)
self.get_logger().info("spin_until_future_complete returned.")
world_to_reference_transform = asyncio.run(self._tf_buffer.lookup_transform_async(
target_f, source_f, rclpy.time.Time()))
rclpy.init()
node = TfFramesFinder()
#frame_parent = "turtle2"
frame_parent = "world"
#frame_child = "turtle2"
frame_child = "static_3"
try:
rclpy.spin_once(node)
node.try3(
target_f=frame_parent,
source_f=frame_child)
except KeyboardInterrupt as e:
print("k/b interrupted")
node.destroy_node()
rclpy.shutdown()