【问题标题】:python multiprocessing scheduling taskpython多进程调度任务
【发布时间】:2013-07-25 11:37:30
【问题描述】:

我有 8 个 CPU 内核和 200 个任务要完成。每个任务都是隔离的。无需等待或分享结果。我正在寻找一种方法来一次运行 8 个任务/进程(最大)并且其中一个完成。剩下的任务会自动启动进程。

如何知道子进程何时完成并启动新的子进程。首先,我尝试使用进程(多处理),但很难弄清楚。然后我尝试使用池并面对泡菜问题,因为我需要使用动态实例化。

已编辑:添加我的池代码

class Collectorparallel():

def fire(self,obj):
    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)

def start(self):
    log_to_stderr(logging.DEBUG)
    pluginObjectList = []
    for pluginName in self.settingModel.getAllCollectorName():
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)



    pool = Pool(8)
    jobs = pool.map(self.fire,pluginObjectList)
    pool.close()

    print pluginObjectList

pluginObjectList 有类似

[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>]

PicklingError: Can't pickle : 属性查找 builtin.instancemethod 失败

但进程版本工作正常

【问题讨论】:

  • 解决方法是使用multiprocessing.Pool。如何解决window的pickling问题完全就看代码了。 发布一些代码!如果您可以编写一个 smallself-contained 示例,我们可以将其用于测试并清楚地显示您的用例。
  • @Bakuriu 我已经添加了我的代码

标签: python multiprocessing


【解决方案1】:

警告这对部署和情况有点主观,但我目前的设置如下

我有一个工作程序,我启动了 6 个副本(我有 6 个内核)。 每个工人都做以下事情;

  1. 连接到 Redis 实例
  2. 尝试弹出特定列表中的一些作品
  3. 推送日志信息
  4. 空闲或因“队列”中缺少工作而终止

然后,每个程序基本上都是独立的,同时仍然使用单独的排队系统完成您需要的工作。由于您的流程没有中间人,这可能是您的问题的解决方案。

【讨论】:

    【解决方案2】:

    我不是 Python 多处理方面的专家,但我在此帮助 http://www.tutorialspoint.com/python/python_multithreading.htm 和这个http://www.devshed.com/c/a/Python/Basic-Threading-in-Python/1/ 的帮助下尝试了一些事情。

    例如,您可以使用此方法isAlive 来回答您的问题。

    【讨论】:

      【解决方案3】:

      您的问题的解决方案是微不足道的。首先,请注意 methods 不能被腌制。实际上只有pickle's documentation 中列出的类型可以腌制:

      • NoneTrueFalse
      • 整数、长整数、浮点数、复数
      • 普通和 Unicode 字符串
      • tuples、lists、sets 和 dictionaries 仅包含可腌制对象
      • 函数在模块的顶层定义
      • 内置函数在模块的顶层定义
      • 在模块顶层定义的类
      • 此类的实例,其__dict__ 或调用__getstate__() 的结果是可腌制的(请参阅腌制协议部分 了解详情)。

      [...]

      请注意,函数(内置和用户定义的)由 “完全限定”的名称引用,不是按值。这意味着 只腌制函数名称,以及定义函数的模块名称。既不是函数的代码,也不是任何 它的功能属性是腌制的。因此定义模块必须是 可在 unpickling 环境中导入,并且模块必须包含 命名对象,否则将引发异常。 [4]

      同样,类由命名引用腌制,所以相同 unpickling 环境中的限制适用。请注意,没有 类的代码或数据被腌制[...]

      显然,方法不是在模块顶层定义的函数,因此它不能被腌制。(仔细阅读文档的那部分以避免将来出现腌制问题!)但是替换它绝对是微不足道的具有全局函数并将self 作为附加参数传递的方法:

      import itertools as it
      
      
      def global_fire(argument):
          self, obj = argument
          self.fire(obj)
      
      
      class Collectorparallel():
      
          def fire(self,obj):
              collectorController = Collectorcontroller()
              collectorController.crawlTask(obj)
      
          def start(self):
              log_to_stderr(logging.DEBUG)
              pluginObjectList = []
              for pluginName in self.settingModel.getAllCollectorName():
                  name = pluginName.capitalize()
                  #Get plugin class and instanitiate object
                  module = __import__('plugins.'+pluginName,fromlist=[name])
                  pluginClass = getattr(module,name)
                  pluginObject = pluginClass()
                  pluginObjectList.append(pluginObject)
      
      
      
              pool = Pool(8)
              jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList))
              pool.close()
      
              print pluginObjectList
      

      请注意,由于Pool.map 仅使用一个参数调用给定函数,我们必须将self 和实际参数“打包”在一起。为此,我有 zipped it.repeat(self) 和原始可迭代对象。

      如果您不关心调用完成的顺序,那么使用pool.imap_unordered可能会提供更好的性能。然而它返回一个可迭代的而不是一个列表,所以如果你想要结果列表,你必须做jobs = list(pool.imap_unordered(...))

      【讨论】:

        【解决方案4】:

        我相信这段代码会解决所有酸洗问题。

        class Collectorparallel():
        
        def __call__(self,cNames):
            for pluginName in cNames:
                name = pluginName.capitalize()
                #Get plugin class and instanitiate object
                module = __import__('plugins.'+pluginName,fromlist=[name])
                pluginClass = getattr(module,name)
                pluginObject = pluginClass()
                pluginObjectList.append(pluginObject)
        
            collectorController = Collectorcontroller()
            collectorController.crawlTask(obj)
        
        def start(self):
            log_to_stderr(logging.DEBUG)
            pool = Pool(8)
            jobs = pool.map(self,self.settingModel.getAllCollectorName())
            pool.close()
        

        这里发生的事情是Collectorparallel 已变成可调用对象。插件名称列表用作池的可迭代对象,插件的实际确定及其实例化在每个工作进程中完成,类​​实例对象用作每个工作进程的可调用对象。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2020-05-09
          • 2011-06-12
          • 2021-02-09
          • 1970-01-01
          • 2020-10-21
          • 2012-06-20
          • 2013-11-27
          • 2018-08-08
          相关资源
          最近更新 更多