【问题标题】:Python multiprocessing not working as intended with fuzzywuzzyPython 多处理无法与fuzzywuzzy 一起按预期工作
【发布时间】:2019-12-21 18:31:28
【问题描述】:

我的进程要么一个接一个地开始,要么它们开始(同时)但没有调用指向函数。我尝试了许多变体,它不会像许多教程教的那样起作用。 我的目标是模糊字符串匹配一个 80k 项目的文本句子列表,丢弃不必要的 90%+ 匹配,同时保持字符串的信息最多(scorer=fuzz.token_set_ratio)。 谢谢!

IDE 是 Anaconda Spyder 4.0、IPython 7.10.1、Python 3.7.5

# -*- coding: utf-8 -*-
import pandas as pd
import multiprocessing
import time
from datetime import datetime
from fuzzywuzzy import fuzz
from fuzzywuzzy import process

#########
preparedDF = []
df1 = []
df2 = []
df3 = []
df4 = []
df5 = []
df6 = []
df7 = []
df8 = []
#########
xdf1 = []
xdf2 = []
xdf3 = []
xdf4 = []
xdf5 = []
xdf6 = []
xdf7 = []
xdf8 = []
#########

def fuzzyPrepare():
    #load data do some easy cleaning
    global preparedDF
    df = pd.read_csv("newEN.csv")
    df = df["description"].fillna("#####").tolist()
    df = list(dict.fromkeys(df))
    try:
        df = df.remove("#####")
    except ValueError:
        pass
    preparedDF=df

def fuzzySplit(df=preparedDF):
    #split data to feed processes
    global df1, df2, df3, df4, df5, df6, df7, df8
    df1 = df[:100]
    df2 = df[100:200]
    df3 = df[200:300]
    df4 = df[300:400]
    df5 = df[400:500]
    df6 = df[500:600]
    df7 = df[600:700]
    df8 = df[700:800]

def fuzzyMatch(x):
    #process.dedupe returns dict_keys object so pass it to a list()
    global xdf1, xdf2, xdf3, xdf4, xdf5, xdf6, xdf7, xdf8
    if x == 1:
        xdf1=list(process.dedupe(df1,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 2:
        xdf2=list(process.dedupe(df2,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 3:
        xdf3=list(process.dedupe(df3,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 4:
        xdf4=list(process.dedupe(df4,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 5:
        xdf5=list(process.dedupe(df5,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 6:
        xdf6=list(process.dedupe(df6,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 7:
        xdf7=list(process.dedupe(df7,threshold=90,scorer=fuzz.token_set_ratio))
    elif x == 8:
        xdf8=list(process.dedupe(df8,threshold=90,scorer=fuzz.token_set_ratio))
    else:
        return "error in fuzzyCases!"

#if __name__ == '__main__':
fuzzyPrepare()
fuzzySplit(preparedDF)
#UNHEEDED MULTIPROCESSING, ONLY THIS LINE TRIGGERS THE ACTUAL FUNCTION -> p1 = multiprocessing.Process(name="p1",target=fuzzyMatch(1), args=(1,))

p1 = multiprocessing.Process(name="p1",target=fuzzyMatch, args=(1,))
p2 = multiprocessing.Process(name="p2",target=fuzzyMatch, args=(2,))
p3 = multiprocessing.Process(name="p3",target=fuzzyMatch, args=(3,))
p4 = multiprocessing.Process(name="p4",target=fuzzyMatch, args=(4,))
p5 = multiprocessing.Process(name="p5",target=fuzzyMatch, args=(5,))
p6 = multiprocessing.Process(name="p6",target=fuzzyMatch, args=(6,))
p7 = multiprocessing.Process(name="p7",target=fuzzyMatch, args=(7,))
p8 = multiprocessing.Process(name="p8",target=fuzzyMatch, args=(8,))

jobs = []
jobs.append(p1)
jobs.append(p2)
jobs.append(p3)
jobs.append(p4)
jobs.append(p5)
jobs.append(p6)
jobs.append(p7)
jobs.append(p8)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))

【问题讨论】:

  • multiprocessing 不共享内存。生成 Process 后对全局变量的更改在进程之间不可见。

标签: python multithreading duplicates multiprocessing fuzzywuzzy


【解决方案1】:

好的,您正在处理一个不平凡的问题。我冒昧了 干燥 (Don't Repeat Yourself) 你的代码有点。我也没有安装你的数据或熊猫,所以我已经简化了 输入和输出。然而,这些原则都是相同的,几乎没有变化 你应该能够让你的代码工作!

尝试 #1

我有一个包含 800 个 int 元素的数组,每个进程都将计算 其中100个的总和。寻找# DRY:cmets

# -*- coding: utf-8 -*-
import multiprocessing
import time
from datetime import datetime

#########
number_of_proc = 8
preparedDF = []
# DRY: This is now a list of lists. This allows us to refer to df1 as dfs[1]
dfs = []
# DRY: A dict of results. The key will be int (the process number!)
xdf = {}
#########

def fuzzyPrepare():
    global preparedDF
    # Generate fake data
    preparedDF = range(number_of_proc * 100)

def fuzzySplit(df):
    #split data to feed processes
    global dfs
    # DRY: Loop and generate N lists for N processes
    for i in range(number_of_proc):
        from_element = i * 100
        to_element = from_element + 100
        print("Packing [{}, {})".format(from_element, to_element))
        dfs.append(df[from_element:to_element])

def fuzzyMatch(x):
    global xdf
    # DRY: Since we now have a dict, all the if-else is not needed any more...
    xdf[x] = sum(dfs[x])
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))


if __name__ == '__main__':
    fuzzyPrepare()
    fuzzySplit(preparedDF)

    # DRY: Create N processes AND append them
    jobs = []
    for p in range(number_of_proc):
        p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,))
        jobs.append(p)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
print("results:")
for x in range(number_of_proc):
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))

输出:

Packing [0, 100)
Packing [100, 200)
Packing [200, 300)
Packing [300, 400)
Packing [400, 500)
Packing [500, 600)
Packing [600, 700)
Packing [700, 800)
process p0 started at 19:12:00
In process: x=0, xdf[0]=4950
process p1 started at 19:12:00
In process: x=1, xdf[1]=14950
process p2 started at 19:12:00
In process: x=2, xdf[2]=24950
process p3 started at 19:12:01
In process: x=3, xdf[3]=34950
process p4 started at 19:12:01
In process: x=4, xdf[4]=44950
process p5 started at 19:12:01
In process: x=5, xdf[5]=54950
process p6 started at 19:12:01
In process: x=6, xdf[6]=64950
process p7 started at 19:12:02
In process: x=7, xdf[7]=74950
processing complete at 19:12:02
results:
Traceback (most recent call last):
  File "./tmp/proctest.py", line 58, in <module>
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))
KeyError: 0

发生了什么?我在处理函数中打印了值,它们在那里?!

好吧,我不是专家,但 python 进程的工作方式很像 fork()。 基本原理是它会产生并初始化一个新的子进程。这 子进程将拥有父母记忆的 COPY(!)。这意味着 父子进程不共享任何数据/内存!!!

所以在我们的例子中:

  • 我们准备数据
  • 我们创建 N 个进程
  • 每个进程都有一个dfsxdf 变量的副本

虽然对于dfs 我们不太关心(因为它们用于输入),但每个 进程现在拥有自己的xdf 而不是父进程的!你明白为什么 KeyError 了吗?

如何解决这个问题(尝试 #2)

现在很明显,我们需要将数据从进程返回给父进程。 有很多方法可以做到这一点,但最简单的(代码方面)是使用 multiprocessing.Manager 在您的子进程之间共享数据(查找 # NEW: 代码中的标记 - 注意我只更改了 2 行!):

# -*- coding: utf-8 -*-
import multiprocessing
import time
from datetime import datetime

# NEW: This can manage data between processes
from multiprocessing import Manager

#########
number_of_proc = 8
preparedDF = []
dfs = []
# NEW: we create a manager object to store the results
manager = Manager()
xdf = manager.dict()
#########

def fuzzyPrepare():
    global preparedDF
    # Generate fake data
    preparedDF = range(number_of_proc * 100)

def fuzzySplit(df):
    #split data to feed processes
    global dfs
    # DRY: Loop and generate N lists for N processes
    for i in range(number_of_proc):
        from_element = i * 100
        to_element = from_element + 100
        print("Packing [{}, {})".format(from_element, to_element))
        dfs.append(df[from_element:to_element])

def fuzzyMatch(x):
    global xdf
    # DRY: Since we no have a dict, all the if-else is not needed any more...
    xdf[x] = sum(dfs[x])
    print("In process: x={}, xdf[{}]={}".format(x, x, xdf[x]))


if __name__ == '__main__':
    fuzzyPrepare()
    fuzzySplit(preparedDF)

    # DRY: Create N processes AND append them
    jobs = []
    for p in range(number_of_proc):
        p = multiprocessing.Process(name="p{}".format(p),target=fuzzyMatch, args=(p,))
        jobs.append(p)

for j in jobs:
    print("process "+ j.name +" started at "+ datetime.now().strftime('%H:%M:%S'))
    j.start()
    time.sleep(0.3)

for j in jobs:
    j.join()

print ("processing complete at "+datetime.now().strftime('%H:%M:%S'))
print("results:")
for x in range(number_of_proc):
    print("Out of process: x={}, xdf[{}]={}".format(x, x, xdf[x]))

还有输出:

Packing [0, 100)
Packing [100, 200)
Packing [200, 300)
Packing [300, 400)
Packing [400, 500)
Packing [500, 600)
Packing [600, 700)
Packing [700, 800)
process p0 started at 19:34:50
In process: x=0, xdf[0]=4950
process p1 started at 19:34:50
In process: x=1, xdf[1]=14950
process p2 started at 19:34:50
In process: x=2, xdf[2]=24950
process p3 started at 19:34:51
In process: x=3, xdf[3]=34950
process p4 started at 19:34:51
In process: x=4, xdf[4]=44950
process p5 started at 19:34:51
In process: x=5, xdf[5]=54950
process p6 started at 19:34:52
In process: x=6, xdf[6]=64950
process p7 started at 19:34:52
In process: x=7, xdf[7]=74950
processing complete at 19:34:52
results:
Out of process: x=0, xdf[0]=4950
Out of process: x=1, xdf[1]=14950
Out of process: x=2, xdf[2]=24950
Out of process: x=3, xdf[3]=34950
Out of process: x=4, xdf[4]=44950
Out of process: x=5, xdf[5]=54950
Out of process: x=6, xdf[6]=64950
Out of process: x=7, xdf[7]=74950

阅读更多关于此here 和 请注意有关 Manager 比 multiprocessing.Array 慢的警告(这实际上也解决了您的问题)

【讨论】:

  • 更多有趣的信息here(正如我所说,不是专家,所以我自己读到了这个:))
  • urban 感谢您的输入。复制粘贴您的第二次尝试会导致内核无法正常工作(根本没有输出,相同的环境)。我需要重新启动内核,因为它以某种方式冻结。
猜你喜欢
  • 2019-06-26
  • 2012-02-19
  • 2019-12-14
  • 1970-01-01
  • 2019-09-04
  • 1970-01-01
  • 2016-05-15
  • 2022-01-19
  • 1970-01-01
相关资源
最近更新 更多