【问题标题】:Parallel for loop, Python并行 for 循环,Python
【发布时间】:2017-05-02 21:18:13
【问题描述】:

目前,这个嵌套的 for 循环需要将近一个小时才能运行完。我希望重写它并创建一些并行同步。我在任何地方都没有找到关于如何像下面这样嵌套的答案。任何指向正确方向的指针将不胜感激。

  #used to update the Software Name's from softwareCollection using the regexCollection
    startTime = time.time()
    for x in softwareCollection.find({}, {"Software Name":-1,"Computer Name":-1,"Version":-1,"Publisher":-1,"reged": null }, no_cursor_timeout=True):
        for y in regexCollection.find({}, {"regName": 1,"newName":1}, no_cursor_timeout=True):
            try:
                regExp = re.compile(y["regName"])
            except:
                print(y["regName"])
                break
            oldName = x["Software Name"]
            newName = y["newName"]
            if(regExp.search(oldName)):
                x["Software Name"] = newName
                x["reged"] = "true"
                softwareCollection.save(x)
                break
            else:
                continue
    print(startTime - time.time() / 60)
    cursor.close()

【问题讨论】:

  • 你能进一步解释一下这是做什么的吗?
  • 所以正在做的是从 mongoDB 列中获取软件名称,并将其与我保存在单独的 mongo 集合中的正则表达式查询列表进行比较。如果名称与正则表达式匹配,则将字段重命名为与该正则表达式关联的任何名称。

标签: python parallel-processing


【解决方案1】:

根据x 的迭代次数,您可以为每个x 步骤生成一个线程,该线程将迭代y

首先,根据x定义运行函数:

def y_iteration(x):
    for y in ... :
        ...

然后在x 的每次迭代中生成一个运行此函数的线程:

for x in ... :
    _thread.start_new_thread(y_iteration, (x,))

这是一个非常基本的示例,使用低级 _thread 模块。

现在您可能需要加入一个主线程,在这种情况下,您需要使用threading 模块。您可能会将 x 迭代放在一个线程中并加入它:

def x_iteration():
    for x in ... :
        threading.Thread(target=y_iteration, args=(x,)).start()

thread = threading.Thread(target=x_iteration)
thread.start()
thread.join()

再一次,这取决于您计划在x 上进行的迭代次数(查看How many threads it too many?)。如果这个数字应该很大,您可能希望创建一个由 100 个工人组成的池,并用y_iteration 养活他们。当每个工人都在工作时,等到有一个空闲。

【讨论】:

  • 总共有 350 万个条目,所以我认为池化绝对是要走的路?
  • @Loglem 超过x 350 万次迭代?是的,这就是我的处理方式。
  • 是的,x 为 350 万,y 为 450。你有没有看到有人使用池导入做类似的事情的例子?
  • @Loglem 我想不出任何东西,尽管它对我来说似乎并不复杂。由于主要的一点是限制线程的数量,一个基本的实现将是保持一个(ugh)global 线程数,并在这个数量达到限制时等待。每个线程的函数(这里是y_iteration)在开始时增加这个计数,并在结束时减少它。这不完全是一个线程池,因为每个线程只工作一次......但这给了你这个想法。只需对线程列表而不是计数执行相同操作即可。
【解决方案2】:

所以我能够让它运行并且工作速度大约是顺序版本的两倍。我担心的是,完成这个过程仍然需要 4 个小时。有没有办法让这更有效,或者我应该期望这需要这么长时间。

#used to update the Software Name's from softwareCollection using the regexCollection
def foo(x):
    for y in regexCollection.find({}, {"regName": 1,"newName":1}, no_cursor_timeout=True):
        try:
            regExp = re.compile(y["regName"])
        except:
            print(y["regName"])
            break
        oldName = x["SoftwareName"]
        newName = y["newName"]
        if(regExp.search(oldName)):
            x["SoftwareName"] = newName
            x["field4"] = "reged"
            softwareCollection.save(x)
            break
        else:
            continue


if __name__ == '__main__':
    startTime = time.time()
    Parallel(n_jobs=4)(delayed(foo)(x) for x in softwareCollection.find())

    print(time.time() - startTime / 60)
    cursor.close()

【讨论】:

    猜你喜欢
    • 2016-08-10
    • 1970-01-01
    • 2017-03-17
    • 2014-08-05
    • 2019-01-31
    • 1970-01-01
    相关资源
    最近更新 更多