ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
I have the same performance issue and dig a bit... ,created a numpy_srv()
, similar with numpy_msg()
But it will only improve performance of byte[]
, not uint8[]
. By using this, to transfer 100MBytes bytes[]
:
But I think uint8[]
is already very fast for most uses.
import sys
import numpy as np
import rospy
from ax_msgs.srv import GetMapTest, GetMapTestRequest, GetMapTestResponse
from rospy.numpy_msg import numpy_msg
SIZE = 1024 * 1024 * 100
_numpy_srv_types = {}
def numpy_srv(srv_type):
if srv_type in _numpy_srv_types:
return _numpy_srv_types[srv_type]
classdict = {
"_type": srv_type._type,
"_md5sum": srv_type._md5sum,
"_request_class": numpy_msg(srv_type._request_class),
"_response_class": numpy_msg(srv_type._response_class),
}
# create the numpy message type
srv_type_name = "Numpy_%s" % srv_type._type.replace("/", "__")
numpy_type = type(srv_type_name, (srv_type,), classdict)
_numpy_srv_types[srv_type] = numpy_type
return numpy_type
GetMapTestNumpy = numpy_srv(GetMapTest) # numpy-lize service
# server
def get_map_bytes_service(req: GetMapTestRequest):
res = numpy_msg(GetMapTestResponse)() # Must use numpy_msg() to create the response
res.map_bytes = np.random.randint(0, 256, 1024 * 1024 * 10, dtype=np.uint8)
return res
if sys.argv[1] == "server":
# server
# create response content
map_bytes = np.random.randint(-128, 127, SIZE, dtype=np.int8)
def get_map_bytes_service(req: GetMapTestRequest):
res = numpy_msg(GetMapTestResponse)() # Must use numpy_msg() to create the response
res.map_bytes = map_bytes # map_bytes is `byte[]`
return res
rospy.init_node("my_node_server")
rospy.Service("/get_map_bytes", GetMapTestNumpy, get_map_bytes_service)
rospy.spin()
else:
# client
rospy.init_node("my_node_client")
func = rospy.ServiceProxy("/get_map_bytes", GetMapTestNumpy)
res: GetMapTestResponse = func(GetMapTestRequest())
print(f"len(res.map_bytes) = {len(res.map_bytes)}")
2 | No.2 Revision |
I have the same performance issue and dig a bit... ,created a numpy_srv()
, similar with numpy_msg()
But it will only improve performance of byte[]
, not uint8[]
. By using this, to transfer 100MBytes bytes[]
:
But I think uint8[]
is already very fast for most uses.
import sys
import numpy as np
import rospy
from ax_msgs.srv import GetMapTest, GetMapTestRequest, GetMapTestResponse
from rospy.numpy_msg import numpy_msg
SIZE = 1024 * 1024 * 100
_numpy_srv_types = {}
def numpy_srv(srv_type):
if srv_type in _numpy_srv_types:
return _numpy_srv_types[srv_type]
classdict = {
"_type": srv_type._type,
"_md5sum": srv_type._md5sum,
"_request_class": numpy_msg(srv_type._request_class),
"_response_class": numpy_msg(srv_type._response_class),
}
# create the numpy message type
srv_type_name = "Numpy_%s" % srv_type._type.replace("/", "__")
numpy_type = type(srv_type_name, (srv_type,), classdict)
_numpy_srv_types[srv_type] = numpy_type
return numpy_type
GetMapTestNumpy = numpy_srv(GetMapTest) # numpy-lize service
if sys.argv[1] == "server":
# server
# create response content
map_bytes = np.random.randint(0, 128, SIZE, dtype=np.uint8)
def get_map_bytes_service(req: GetMapTestRequest):
res = numpy_msg(GetMapTestResponse)() # Must use numpy_msg() to create the response
res.map_bytes = np.random.randint(0, 256, 1024 * 1024 * 10, dtype=np.uint8)
return res
if sys.argv[1] == "server":
# server
# create response content
map_bytes = np.random.randint(-128, 127, SIZE, dtype=np.int8)
def get_map_bytes_service(req: GetMapTestRequest):
res = numpy_msg(GetMapTestResponse)() # Must use numpy_msg() to create the response
res.map_bytes = map_bytes # map_bytes is `byte[]`
return res
rospy.init_node("my_node_server")
rospy.Service("/get_map_bytes", GetMapTestNumpy, get_map_bytes_service)
rospy.spin()
else:
# client
rospy.init_node("my_node_client")
func = rospy.ServiceProxy("/get_map_bytes", GetMapTestNumpy)
res: GetMapTestResponse = func(GetMapTestRequest())
print(f"len(res.map_bytes) = {len(res.map_bytes)}")