【问题标题】:Multiprocessing list search多处理列表搜索
【发布时间】:2017-06-08 04:34:39
【问题描述】:

所以我使用进程和队列来搜索数据并找到在不同列中具有相同条目的行。我决定使用多处理来尝试使其可以针对大数据进行扩展。该文件有 1000 行和每行 10 个数据点。我只读取了 80 行数据并且程序停止了。 70 行,运行良好,速度也不错。

我的问题是我做错了什么,或者我没有发现这种方法的局限性?该代码无论如何都不是完美的,并且本身可能很糟糕。代码如下:

from multiprocessing import Process, Queue
import random

def openFile(file_name, k, division):
    i = 0
    dataSet = []
    with open(file_name) as f:
        for line in f:
            stripLine = line.strip('\n')
            splitLine = stripLine.split(division)
            dataSet += [splitLine]
            i += 1
            if(i == k):
                break

    return(dataSet)

def setCombination(q,data1,data2):
    newData = []
    for i in range(0,len(data1)):
        for j in range(0, len(data2)):
            if(data1[i][1] == data2[j][3]):
                newData += data2[j]
    q.put(newData)

if __name__ == '__main__':
    # Takes in the file, the length of the data to read in, and how the data is divided.
    data = openFile('testing.txt', 80, ' ')
    for i in range(len(data)):
        for j in range(len(data[i])):
            try:
                data[i][j] = float(data[i][j])
            except ValueError:
                 pass

    #print(data)
    k = len(data)//10
    q = Queue()
    processes = [Process(target=setCombination, args=(q, data[k*x: k + k*x], data))
                                                                for x in range(10)]
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()

    saleSet = [q.get() for p in processes]
    print('\n', saleSet)

数据文件testing.txt

【问题讨论】:

  • 10k 数据点似乎太少而无法从多处理中受益。尝试先编写最简单的单进程、单线程解决方案,然后考虑使用更高级别的结构(例如 `multiprocessing.Pool')并行化解决方案。
  • 当使用with open 构造时,您不需要显式关闭文件。这就是 with 语句的好处
  • 你的字面意思是程序无限期冻结 80 行数据,而不是 70 行数据?
  • 看起来Process(... args=...) 中的数据量可能会起作用:70 行,data 小到可以容纳(在哪里?),80,它不是。我要么在每个子进程中重新加载文件,要么传递 data 整体,并传递开始/结束索引。
  • @martineau 是程序无限冻结。

标签: python search multiprocessing shared-memory python-multiprocessing


【解决方案1】:

您的代码所做的某些事情似乎导致了死锁。在实验过程中,我注意到 10 个任务中有 3 个永远不会终止,但老实说,我真的不知道原因。

好消息是它很容易通过删除或禁用来修复

# Exit the completed processes
for p in processes:
    p.join()

你的代码中有循环。

这是您的代码的完整版本,其中(大部分)只是进行了修改:

from multiprocessing import Process, Queue

def openFile(file_name, k, division):
    i = 0
    dataSet = []
    with open(file_name) as f:
        for line in f:
            stripLine = line.strip('\n')
            splitLine = stripLine.split(division)
            dataSet += [splitLine]
            i += 1
            if i == k:
                break

    return dataSet

def setCombination(q, data1, data2):
    newData = []
    for i in range(len(data1)):
        for j in range(len(data2)):
            if data1[i][1] == data2[j][3]:
                newData += data2[j]
    q.put(newData)

if __name__ == '__main__':
    # Takes in the file, the length of the data to read in, and how the data is divided.
    data = openFile('testing.txt', 80, ' ')

    for i in range(len(data)):
        for j in range(len(data[i])):
            try:
                data[i][j] = float(data[i][j])
            except ValueError:
                 pass

    k = len(data) // 10
    q = Queue()
    processes = [Process(target=setCombination, args=(q, data[k*x: k*x+k], data))
                    for x in range(10)]
    for p in processes:
        p.start()

# NO LONGER USED (HANGS)
#    # Exit the completed processes
#    for p in processes:
#        p.join()

    # note: this works since by default, get() will block until it can retrieve something
    saleSet = [q.get() for _ in processes]  # a queue item should be added by each Process
    print('\n', saleSet)

【讨论】:

  • 您能否解释一下您是如何确定您发现 10 个任务中有 3 个永远不会终止的?有没有办法一次启动所有任务而不是 for 循环,但达到相同的结果。如果我在比较字符串,会知道我需要改变什么吗?
  • 我通过将代码中的for p in processes:p.join() 循环替换为打印了0s 和1s 字符串的循环来识别它们,表明10 个进程中的哪一个是@987654327 @ 一遍又一遍,直到结果全为零。这很快就会变成0000000111,但最后三个任务永远不会终止,所以循环永远不会结束。我知道没有办法同时启动它们。但不要认为这会改变任何事情。 setCombination() 函数将比较作为参数传递给它的 data1data2 列表中的任何类型的数据。
  • 事后思考:查看我的最新版本,无需切换到使用 JoinableQueue 即可工作。
  • 这会提高效率吗?还是它使代码更易于阅读?或两者?我确实有另一个关于 python 的问题要问你。我可以通过引用进程来传递数据,这样我就不会在更大的数据上出现内存错误吗?
  • 不知道它是否更有效,我想代码可读性差不多。我使用JoinableQueue 的原始答案有点骇人听闻,因为后者实际上是设计用于队列 consumer 线程,而不是生产者线程。您不能通过引用传递任何内容,因为每个任务都使用单独的 Python 解释器在其自己的内存空间中运行。多处理Queues 是用管道实现的,因此您应该能够处理大量数据。继续...
猜你喜欢
  • 2018-06-28
  • 1970-01-01
  • 2016-06-14
  • 1970-01-01
  • 2013-09-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-24
相关资源
最近更新 更多