【发布时间】:2017-06-08 04:34:39
【问题描述】:
所以我使用进程和队列来搜索数据并找到在不同列中具有相同条目的行。我决定使用多处理来尝试使其可以针对大数据进行扩展。该文件有 1000 行和每行 10 个数据点。我只读取了 80 行数据并且程序停止了。 70 行,运行良好,速度也不错。
我的问题是我做错了什么,或者我没有发现这种方法的局限性?该代码无论如何都不是完美的,并且本身可能很糟糕。代码如下:
from multiprocessing import Process, Queue
import random
def openFile(file_name, k, division):
i = 0
dataSet = []
with open(file_name) as f:
for line in f:
stripLine = line.strip('\n')
splitLine = stripLine.split(division)
dataSet += [splitLine]
i += 1
if(i == k):
break
return(dataSet)
def setCombination(q,data1,data2):
newData = []
for i in range(0,len(data1)):
for j in range(0, len(data2)):
if(data1[i][1] == data2[j][3]):
newData += data2[j]
q.put(newData)
if __name__ == '__main__':
# Takes in the file, the length of the data to read in, and how the data is divided.
data = openFile('testing.txt', 80, ' ')
for i in range(len(data)):
for j in range(len(data[i])):
try:
data[i][j] = float(data[i][j])
except ValueError:
pass
#print(data)
k = len(data)//10
q = Queue()
processes = [Process(target=setCombination, args=(q, data[k*x: k + k*x], data))
for x in range(10)]
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
saleSet = [q.get() for p in processes]
print('\n', saleSet)
数据文件testing.txt
【问题讨论】:
-
10k 数据点似乎太少而无法从多处理中受益。尝试先编写最简单的单进程、单线程解决方案,然后考虑使用更高级别的结构(例如 `multiprocessing.Pool')并行化解决方案。
-
当使用
with open构造时,您不需要显式关闭文件。这就是 with 语句的好处 -
你的字面意思是程序无限期冻结 80 行数据,而不是 70 行数据?
-
看起来
Process(... args=...)中的数据量可能会起作用:70 行,data小到可以容纳(在哪里?),80,它不是。我要么在每个子进程中重新加载文件,要么传递data整体,并传递开始/结束索引。 -
@martineau 是程序无限冻结。
标签: python search multiprocessing shared-memory python-multiprocessing