Parallelize a for loop inside an ActionServer in Python

asked 2022-02-03 03:46:31 -0500

ravijoshi gravatar image

I have a time taking task which is defined inside ActionServer in Python. The task is called many times with different input arguments inside a for loop. I am trying to parallelize this loop using multiprocessing.Pool module. But no luck. Please see a code snippet below:

#! /usr/bin/env python3

import rospy
import time
import actionlib
import multiprocessing as mp
from parallel_package.msg import ExampleAction, ExampleResult


class ExampleActionServer(object):
    def __init__(self):
        self._as = actionlib.SimpleActionServer(
            "example",
            ExampleAction,
            execute_cb=self.cb,
            auto_start=False,
        )
        self._as.start()

    def time_taking_task(n):
        # do the task here and then return the status of the task
        time.sleep(n)
        a = 10  # sample value
        return a

    def cb(self, goal):
        with mp.Pool(processes=4) as pool:
            status = pool.map(self.time_taking_task, list(range(goal.value)))  # doesn't work
            # status = pool.map(time_taking_task, list(range(goal.value))) # works
        self._as.set_succeeded(ExampleResult(status))


if __name__ == "__main__":
    rospy.init_node("example_node")
    server = ExampleActionServer()
    rospy.spin()

Upon calling this action, it throws the following exception:

[ERROR] [1643879646.346031]: Exception in your execute callback: cannot pickle '_thread.RLock' object
Traceback (most recent call last):
  File "/opt/ros/noetic/lib/python3/dist-packages/actionlib/simple_action_server.py", line 289, in executeLoop
    self.execute_callback(goal)
  File "/home/user/ros_ws/src/actionlib_tutorials/scripts/fibonacci_server.py", line 34, in cb
    nums = pool.map(self.time_taking_task, list(range(goal.value)))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

I tried many other modules such as joblib, concurrent, etc. but no luck. It seems that SimpleActionServer is not picklable.

Anyway, I want to know how to parallelize a for loop inside an action server in Python?

edit retag flag offensive close merge delete

Comments

Have you looked at TypeError: can't pickle _thread.lock objects?

The problem here is that self [..] can't be pickled as it is a class instance.

abhishek47 gravatar image abhishek47  ( 2022-02-05 06:30:10 -0500 )edit