【问题标题】:How to read file using multiprocessing pooling如何使用多处理池读取文件
【发布时间】:2017-12-22 14:24:37
【问题描述】:

我想读取内容约为 2GB 的文件,我尝试使用多处理池来执行,但出现错误:

TypeError: 'type' object is not iterable

我知道 map 总是接受可迭代的参数,但有没有办法做到这一点? 到目前为止,这是我的代码:

def load_embeddings(FileName):

    #file = open(FileName,'r')
    embeddings = {}
    i = 0
    print  "Loading word embeddings first time"
    for line in FileName:
             # print line

            tokens = line.split('\t')
            tokens[-1] = tokens[-1].strip()

            #each line has 400 tokens
            for i in xrange(1, len(tokens)):
                    tokens[i] = float(tokens[i])
                    embeddings[tokens[0]] = tokens[1:-1]
    print  "finished"
    return embeddings

if __name__ == "__main__":

    t1 = time.time()
    p = Pool(processes=5)
    FileName  = './asag/Resources/EN-wform.w.5.cbow.neg10.400.subsmpl.txt'
    file_ = open(FileName,'r')
    #fun = partial(load_embeddings,FileName) 
    result = p.map(load_embeddings, file_)
    p.close()
    p.join()
    print ("Time it took :" + str(time.time() - t1))

【问题讨论】:

  • 猜测:只需将文件名(字符串)传递给函数并让它打开文件。 Python files 不可腌制(无论大小),因此在使用 map() 时不能作为参数传递。
  • @Lucky 您是否希望使用多个 CPU 读取文件?
  • 我已经尝试过了,但是 map 函数至少需要参数,如何解决这个问题?我现在将文件位置直接传递给“load_embeddings”函数。
  • @ABDULNIYASPM 是的,多处理器。我正在处理集群,所以我想使用多个处理器来读取文件以加快处理速度。

标签: python python-2.7 multiprocessing python-multiprocessing


【解决方案1】:

如果在单进程环境中运行,您的源代码将是正确的。虽然你的论点FileName 应该命名为file,因为它实际上是一个打开的文件句柄而不是文件名(字符串)。

现在,您正在为 5 个进程提供相同的文件句柄来处理。使用for line in FileName,您可以对文件句柄进行读取操作。这在 5 个不同的进程中并行发生。所有人都不知道其他人(这就是它的美妙之处:对于操作系统来说,这些是运行的不同程序。但它们都从同一个文件句柄中读取)。现在,这似乎不是原子的,并且可以在仅部分读取该行后中断此调用。也可能是python在内部缓冲,但缓冲区是每个进程的。这导致在line 中有半行或第一行的一部分和第二行的一部分(因为python 只会读取直到它看到第一个\n),然后当你想进一步处理该行时会出现错误。

要解决此问题,您需要先在主进程中读取文件,然后将这些行交给map 函数,如下所示:

from multiprocessing import Pool

def load_embeddings(line):
    embeddings = {}
    i = 0
    tokens = line.split('\t')
    tokens[-1] = tokens[-1].strip()

    #each line has 400 tokens
    for i in xrange(1, len(tokens)):
            tokens[i] = float(tokens[i])
            embeddings[tokens[0]] = tokens[1:-1]
    print "finished"
    return embeddings

if __name__ == "__main__":
    p = Pool(processes=5)
    file_name  = 'file.tsf'
    lines = []
    with open(file_name,'r') as f:
        for line in f:
            lines.append(line.strip())

    result = p.map(load_embeddings, lines)
    p.close()
    p.join()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-21
    • 2020-06-15
    • 1970-01-01
    • 1970-01-01
    • 2020-07-03
    • 1970-01-01
    相关资源
    最近更新 更多