【问题标题】:Terminating multiprocess pool when one of the workers found proper solution当一名工作人员找到适当的解决方案时终止多进程池
【发布时间】:2017-12-19 09:34:41
【问题描述】:

我创建了一个程序,可以总结如下:

from itertools import combinations
class Test(object):
    def __init__(self, t2):
        self.another_class_object = t2

def function_1(self,n):
   a = 2
   while(a <= n):
       all_combs = combinations(range(n),a)
       for comb in all_combs:
           if(another_class_object.function_2(comb)):
              return 1
       a += 1
   return -1

函数combinations 是从itertools 导入的。 Function_2 根据输入返回 TrueFalse 并且是另一个类对象中的方法,例如:

class Test_2(object):

def __init__(self, list):
    self.comb_list = list

def function_2(self,c):
    return c in self.comb_list

一切正常。但现在我想稍微改变一下并实现多处理。我发现this topic 显示了一个示例,说明当其中一个工作进程确定不再需要完成工作时如何退出脚本。所以我做了以下更改:

  1. __init__方法中添加了池的定义:self.pool = Pool(processes=8)
  2. 创建了一个回调函数:

    all_results = []
    def callback_function(self, result):
        self.all_results.append(result)
        if(result):
            self.pool.terminate()
    
  3. 更改为function_1:

    def function_1(self,n):
        a = 2
        while(a <= n):
           all_combs = combinations(range(n),a)
           for comb in all_combs:
               self.pool.apply_async(self.another_class_object.function_2, args=comb, callback=self.callback_function)
           #self.pool.close()
           #self.pool.join()
           if(True in all_results):
               return 1
           a += 1
       return -1
    

不幸的是,它没有按我的预期工作。为什么?调试后,似乎永远无法到达回调函数。我以为每个工人都能接触到它。我错了吗?可能是什么问题?

【问题讨论】:

  • 对于初学者来说,不要将实例方法传递给您的 multiprocessing.Pool 设施(或一般的多处理代码),因为这是您不想打开的一大堆伤害。

标签: python python-3.x multiprocessing


【解决方案1】:

我没有尝试过您的代码,但我尝试了您的结构。您确定问题出在回调函数而不是工作函数中吗?如果函数是类方法,我没有设法让 apply_async 启动工作函数的单个实例。它只是没有做任何事情。 Apply_async 完成时没有错误,但它没有实现 worker。

一旦我将工作函数(在您的情况下为 another_class_object.function2)作为独立的全局函数移到类之外,它就开始按预期工作并且回调被正常触发。相比之下,回调函数似乎可以作为类方法正常工作。

这里似乎有关于这个的讨论:Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

这有什么用吗?

汉努

【讨论】:

    【解决方案2】:

    问题:...不像我预期的那样工作。 ... 可能是什么问题?

    始终需要get() pool.apply_async(... 的结果才能查看池进程中的错误。

    更改如下:

    pp = []
    for comb in all_combs:
        pp.append(pool.apply_async(func=self.another_class_object.function_2, args=comb, callback=self.callback_function))
    
    pool.close()
    
    for ar in pp:
        print('ar=%s' % ar.get())
    

    你会看到这个错误:

    TypeError: function_2() takes 2 positional arguments but 3 were given
    

    修复此错误,将args=comb 更改为args=(comb,)

    pp.append(pool.apply_async(func=self.another_class_object.function_2, args=(comb,), callback=self.callback_function))
    

    用 Python 测试:3.4.2

    【讨论】:

      猜你喜欢
      • 2021-07-05
      • 1970-01-01
      • 1970-01-01
      • 2021-10-24
      • 2015-11-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-02
      相关资源
      最近更新 更多