【发布时间】:2021-01-09 11:14:33
【问题描述】:
假设我们有以下脚本 (read_file.py),它读取一个文件并将前 200000 行写入另一个文件。
import pandas as pd
import sys
a = sys.argv[1]
b = sys.argv[2]
df = pd.read_csv(a, header=0, sep="\t").head(200000).to_csv(b, header=True, index=False)
让我们有第二个脚本 (test-latency.py),它通过多处理(在两个文件上)调用第一个脚本。然后读取生成的两个文件并合并。
import pandas as pd
import multiprocessing as mp
import sys
import subprocess
import time
a = sys.argv[1]
b = sys.argv[2]
l = [a, b]
pool = mp.Pool(processes = (mp.cpu_count() - 1))
for filename in l:
f_in = filename
f_out = filename + "out.tsv"
cmd = ['python', 'read_file.py', f_in, f_out]
pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()
time.sleep(1)
df1 = pd.read_csv(a + "out.tsv")
df2 = pd.read_csv(b + "out.tsv")
df = pd.merge(df1, df2, on="Name").to_csv("test.tsv", sep="\t", header=0)
问题在于,根据文件系统速度(由于 NFS 缓存),pool.join() 之后文件可能不存在。这通过time.sleep(1) 以某种方式解决,该延迟直到文件存在。但这不是最佳解决方案,因为对于慢速文件系统,它可能导致FileNotFoundError: [Errno 2]。一种解决方案是提供一个通用的延迟等待选项,但我认为让用户参与此类决策并不明智。您对此问题有何建议?
【问题讨论】:
-
tail读取底线,而不是顶线。你会想要head。 -
文件系统是基于状态的数据存储,而不是事件驱动的消息传递系统。不要将它们用作消息传递系统。
-
所以这是专门针对 NFS 的?
-
@Thomas 是的,抱歉。我更新了代码。
-
@Wups 发生在 NFS,但可能更普遍。
标签: python performance multiprocessing