【问题标题】:generating a list of arrays using multiprocessing in python在python中使用多处理生成数组列表
【发布时间】:2021-01-22 15:54:40
【问题描述】:

我很难实现并行化以生成数组列表。在这种情况下,每个数组都是独立生成的,然后附加到一个列表中。当我给它提供复杂的参数时,不知何故 multiprocessing.apply_asynch() 正在输出一个空数组。

更具体地说,只是为了给出上下文,我正在尝试使用并行化实现机器学习算法。这个想法如下:我有一个“系统”和一个在系统上执行操作的“代理”。为了教代理(在这种情况下是神经网络)如何以最佳方式表现(关于我在这里省略的某个奖励方案),代理需要通过对其应用动作来生成系统的轨迹。从执行动作获得的奖励中,智能体然后学习做什么和不做什么。请注意,代码中可能的操作被称为整数:

    possible_actions = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
    

所以我在这里尝试使用多处理生成许多这样的轨迹(抱歉,代码在这里无法运行,因为它需要许多其他文件,但我希望有人能发现这个问题):

    from quantum_simulator_EC import system
    from reinforce_keras_EC import Agent
    import multiprocessing as mp


    s = system(1200, N=3)
    s.set_initial_state([0,0,1])  
    agent = Agent(alpha=0.0003,  gamma=0.95, n_actions=len( s.actions ))


    def get_result(result):
        global action_batch
        action_batch.append(result)

    def generate_trajectory(s, agent):

        sequence_of_actions = []

        for k in range( 5 ):

            net_input = s.generate_net_input_FULL(6)
            action = agent.choose_action( net_input )
    
            sequence_of_actions.append(action)
    

        return sequence_of_actions
    
    action_batch = []

    pool = mp.Pool(2)
    for i in range(0, batch_size):
        pool.apply_async(generate_trajectory, args=(s,agent), callback=get_result)
    pool.close()
    pool.join()

    print(action_batch)

问题是代码返回一个空数组[]。有人可以向我解释问题是什么吗?我可以传递给 apply_asynch 的参数类型是否有限制?在这个例子中,我传递了我的系统's'和我的'代理',这两个都是复杂的对象。我之所以提到这一点,是因为当我使用整数或矩阵等简单参数而不是代理和系统测试我的代码时,它可以正常工作。如果没有明显的原因导致它不工作,如果有人有一些调试代码的技巧也会有帮助。

请注意,如果我不使用多处理,将最后一部分替换为:

    action_batch = []

    for i in range(0, batch_size):
        get_result( generate_sequence(s,agent) )

    print(action_batch)

在这种情况下,这里的输出与预期的一样,是 5 个动作的序列列表:

    [[4, 2, 1, 1, 7], [8, 2, 2, 12, 1], [8, 1, 9, 11, 9], [7, 10, 6, 1, 0]]

【问题讨论】:

  • 您是要返回完整的完成过程还是中间结果?在第一种情况下,您可以直接将来自pool.apply_async 的返回值分配给列表并使用result.get() 获取值,在第二种情况下,我建议使用队列。
  • 完整的完成过程。每个进程输出一个列表,然后应将其附加到另一个列表,该列表将包含所有生成的输出。通过将pool.apply_async 的返回值分配给一个列表,你的意思是我应该定义一个结果列表results = [],然后按以下方式多次附加apply_asyncresults.append( pool.apply_async(generate_trajectory, args=(s,agent) )
  • 是的,基本上,我已经写了一个答案来演示整个过程,因为它还需要closejoin 调用,如here 所述。

标签: python-3.x machine-learning python-multiprocessing apply-async


【解决方案1】:

最终结果可以直接附加到主进程中的列表中,无需创建回调函数。然后你可以closejoin这个pool,最后用get检索所有结果。

请参阅以下两个示例,使用apply_asyncstarmap_async,(请参阅this 帖子了解区别)。

解决方案apply

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        results = []
        for s in range(5):
            results.append(pool.apply_async(func, args=(s, agent)))
        pool.close()
        pool.join()

    print([result.get() for result in results])

解决方案starmap

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        result = pool.starmap_async(func, [(s, agent) for s in range(5)])
        pool.close()
        pool.join()

    print(result.get())
输出
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]

【讨论】:

  • 所以我已经尝试了两种方式(使用 s 和 agent 来自我的代码的对象实例)。但是代码在result.get() 处生成错误,对于apply_asyncstarmap_async。错误是:TypeError: can't pickle weakref objects。你知道问题可能是什么吗?你给出的例子和我的代码之间唯一真正的区别是,在我的例子中 s 和 agent 是对象实例,它们交互生成整数列表,但我的猜测是它不应该更好......
  • 最终结果是可变的,传递时是不允许的。您可以在返回之前使它们不可变 (tuple(result))。
  • 所以在你的第一个例子中,我添加了results2 = tuple(results),然后我将打印语句修改为print( results2[0].get() )。我仍然收到错误TypeError: can't pickle weakref objects
  • 我可能在上一条消息中不清楚,来自generate_trajectory 的返回值应该转换为tuple。您的代理类也可能是不可选择的,在这种情况下,它应该在函数本身中重建。然后创建Process 可能会更容易。
  • 我尝试按照您的建议使函数返回 tuple,但仍然存在相同的错误消息。我还尝试将mp.Processmp.Queue 一起使用,但这也不起作用。问题是相似的:只要我用简单的整数或矩阵参数输入Process,它就可以工作,但是当我尝试输入系统和代理对象时它会挂起(即它无限期地运行,没有错误消息)。跨度>
猜你喜欢
  • 2017-03-05
  • 1970-01-01
  • 1970-01-01
  • 2013-01-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-08
相关资源
最近更新 更多