【问题标题】:Why aren't my Python Multiprocessing worker processes using multiple cores?为什么我的 Python 多处理工作进程不使用多核?
【发布时间】:2014-07-20 14:37:33
【问题描述】:

使用多处理中的 Pool 类,我将数据库搜索任务拆分为并行进程,每个进程针对我已加载到内存中的非常大的数据库运行一组正则表达式。 该程序在具有 60 多个内核和大量内存的强大 Windows 服务器上运行。

我的 Python 编程经验,尤其是多处理,是相当肤浅的。

当我第一次创建程序时,一切正常,每个工人都很好地处理了它的部分,然后继续处理下一个。我有几个月没有碰它,直到我不得不对数据库查询进行一些格式更改,但是当我再次启动它时,它运行得太慢了。 在测试中,我确定我生成的进程数量实际上并没有改变运行速度,并且确实查看任务管理器显示所有进程都在那里冷却,但其中只有一个实际上显示出任何工作迹象。

def calc(ruleList,record):
    returnList = []
    print(record[5],end = '\r')
    hits = recordIterator(ruleList,record)
    for h in hits:
        returnList.append([record[0],record[1],h])
    return returnList

nthreads = 48
hname = 'Hits.txt'
p = multiprocessing.Pool(processes = nthreads)
Hits = []
for record in Records:
    Hits.append((p.apply_async(calc, (rules, record))).get())

hhandle = open(hname, "w")
for hit in Hits:
    try:
        for x in hit:
            hhandle.write(str(x[0])+'|'+str(x[1])+'|'+str(x[2])+'\n')
    except (UnicodeEncodeError,UnicodeDecodeError):
        pass
hhandle.close()

我不是机器上的管理员,也不熟悉如何配置服务器,但在我看来,Windows 根本没有将子进程调度到单独的内核。 我尝试以多种不同的方式重新配置我的代码以避免潜在的多处理阻塞,但每个功能变体最终都会遇到同样的问题。

我的代码中是否有一些我遗漏的东西阻碍了流程? 是否有某些 Windows Server 设置可能已更改,以使我的工作人员无法使用单独的内核?

【问题讨论】:

    标签: windows python-3.x multiprocessing


    【解决方案1】:

    在我看来,代码(p.apply_async(calc, (rules, record))).get() 正在强制您的程序一次只运行一项作业。父进程将在get() 中等待上一个作业的结果可用,然后再启动下一个作业。

    尝试用一个对starmap的调用来替换Records和多个apply_async调用上的循环:

    Hits = p.starmap(calc, ((rules, record) for record in Records))
    

    这会将记录传递到池中,并且只有在它们都被发送块之后才会进入结果。

    【讨论】:

    • 我把它换成了循环,回溯告诉我我现在有一个“IndexError: list index out of range”。我对代码进行了一些修改,希望问题只是“规则”值本身就是一个列表,但此时我无法消除输入规则列表的需要。与此同时,我正在寻找其他一些方法来做到这一点。
    • 好的,我解决了这个问题。事实证明,没有一个映射函数可以处理包含在迭代旁边作为函数输入的第二个变量。在我的情况下,rules 本身就是一个列表列表,并且地图(地图、星图或 imap 相同)不知道如何将其与@987654330 中的record 的迭代一起交给calc @。我找到了一种将rules 嵌入到其定义中的calc 的方法,现在程序正在以适当的速度运行!
    【解决方案2】:

    充实@blckknght 的答案:apply_async() 提交工作,但.get() 要求立即获得结果。一个更简单的解决方案是提交所有作业,然后在输入时获取每个结果,而不管顺序如何。即使用imap_unordered()

    来源

    import multiprocessing
    
    def calc(num):
        return num*2
    
    pool = multiprocessing.Pool(5)
    for output in pool.imap_unordered(calc, [1,2,3]):
        print 'output:',output
    

    输出

    output: 2
    output: 4
    output: 6
    

    【讨论】:

    • 使用 imap_unordered 的功能与宣传的一样,谢谢!
    猜你喜欢
    • 2017-09-24
    • 2013-08-13
    • 1970-01-01
    • 1970-01-01
    • 2015-06-18
    • 2015-04-06
    • 1970-01-01
    • 1970-01-01
    • 2014-01-24
    相关资源
    最近更新 更多