The `rospy.wait_for_message` method does not work in test case
I am trying to write some tests for my python node my_node
. This node should start sending Float64
messages with value 0.0
on the wheel command topics after it recieves a MyMsg
message with a deflection field in z-axis set to higher than 0.005
(the threshold from param server). So I thought I would run the my_node
node, create another node in the test file that would send this message with deflection exceeding the threshold, and then wait for the messages from the both wheel command topics and see if they are zero.
I have confirmed that the my_node
is running. The threshold parameter is on the param server and is read by the my_node
. Also the wheels command topics are present in rospy.get_published_topics('my_robot')
list. The test runs as expected until it gets to the rospy.wait_for_message
where it gets stucked and the test ends with an error on the timeout.
It seems to me like this should work (the whole test script is shown below). Can you see any reason why this does not work? Or how to debug further?
the error:
Traceback (most recent call last):
File "/home/linux/catkin_ws/src/my_package/tests/scripts/test_my_node.py", line 103, in <module>
rosunit.unitrun('my_package', 'test_my_node', TestMyNode)
File "/opt/ros/kinetic/lib/python2.7/dist-packages/rosunit/pyunit.py", line 99, in unitrun
result = create_xml_runner(package, test_name, result_file).run(suite)
File "/opt/ros/kinetic/lib/python2.7/dist-packages/rosunit/xmlrunner.py", line 261, in run
test(result)
File "/usr/lib/python2.7/unittest/suite.py", line 70, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.7/unittest/suite.py", line 108, in run
test(result)
File "/usr/lib/python2.7/unittest/case.py", line 393, in __call__
return self.run(*args, **kwds)
File "/usr/lib/python2.7/unittest/case.py", line 329, in run
testMethod()
File "/home/linux/catkin_ws/src/my_package/tests/scripts/test_my_node.py", line 89, in test_stop_messages_were_sent
left_wheel_msg = rospy.wait_for_message('/my_robot/joint_rear_left_wheel_controller/command', Float64)
File "/opt/ros/kinetic/lib/python2.7/dist-packages/rospy/client.py", line 426, in wait_for_message
raise rospy.exceptions.ROSInterruptException("rospy shutdown")
rospy.exceptions.ROSInterruptException: rospy shutdown
[Testcase: testTestMyNode] ... ERROR!
ERROR: max time [60.0s] allotted for test [TestMyNode] of type [my_package/test_my_node.py]
File "/usr/lib/python2.7/unittest/case.py", line 329, in run
testMethod()
File "/opt/ros/kinetic/lib/python2.7/dist-packages/rostest/runner.py", line 148, in fn
self.test_parent.run_test(test)
File "/opt/ros/kinetic/lib/python2.7/dist-packages/rostest/rostest_parent.py", line 132, in run_test
return self.runner.run_test(test)
File "/opt/ros/kinetic/lib/python2.7/dist-packages/roslaunch/launch.py", line 684, in run_test
(test.time_limit, test.test_name, test.package, test.type))
--------------------------------------------------------------------------------
[ROSTEST]-----------------------------------------------------------------------
SUMMARY
* RESULT: FAIL
* TESTS: 0
* ERRORS: 1
* FAILURES: 0
the test script:
#!/usr/bin/env python
import subprocess
import shlex
import sys
import signal
import psutil
import unittest
import roslaunch
import rospy
import rosnode
from my_msgs.msg import *
from std_msgs.msg import Float64
def kill_child_processes(parent_pid, sig=signal.SIGTERM ...