【问题标题】:Processing Large Files in Python [ 1000 GB or More]在 Python 中处理大文件 [1000 GB 或更多]
【发布时间】:2014-07-09 01:02:24
【问题描述】:

假设我有一个 1000 GB 的文本文件。我需要找出一个短语在文本中出现的次数。

有没有比我下面使用的更快的方法来做到这一点? 完成任务需要多少时间。

phrase = "how fast it is"
count = 0
with open('bigfile.txt') as f:
    for line in f:
        count += line.count(phrase)

如果我是对的,如果我在内存中没有这个文件,我会等到每次我进行搜索时 PC 加载文件,这对于 250 MB/秒的硬盘至少需要 4000 秒驱动器和 10000 GB 的文件。

【问题讨论】:

  • grep肯定会比python快。
  • 您的代码目前无法处理短语在文件中被拆分为多行的情况。
  • @jonrsharpe 您是否建议将整个文件放入内存中?
  • 多线程无助于解决 IO 瓶颈,充其量只会让你的程序更复杂,更糟糕的是,你的硬盘磁头会乱扔垃圾,读取速度会慢得多。如果您确实需要尽快处理此文件,则需要将文件拆分到多个硬盘上。这可以通过在 RAID 配置中使用多个硬盘或通过在多台机器上并行处理文件然后将结果聚合到一个中来实现。
  • @Brana 我认为您应该编辑原始问题以指定您将针对此文件执行多次短语搜索(似乎限制为 3-5 个单词?),因为它显着改变了人们的回答方式。

标签: python performance file python-2.7 text


【解决方案1】:

您是否考虑过为您的文件编制索引?搜索引擎的工作方式是创建从单词到它们在文件中的位置的映射。说如果你有这个文件:

Foo bar baz dar. Dar bar haa.

您创建的索引如下所示:

{
    "foo": {0},
    "bar": {4, 21},
    "baz": {8},
    "dar": {12, 17},
    "haa": {25},
}

可以在 O(1) 中查找哈希表索引;所以它的速度非常快。

有人搜索查询“bar baz”,您首先将查询分解为其组成词:["bar", "baz"],然后您找到 {4, 21}, {8};然后你用它直接跳到可能存在查询文本的地方。

索引搜索引擎也有现成的解决方案;例如SolrElasticSearch

【讨论】:

  • 这很有趣。我需要句子中的位置以及文件中句子的位置才能进行印迹搜索。但这似乎只是为了使搜索更快,并且可能无法压缩文件。我所做的只是简单地将所有 2 3 和 4 和 5 克写在 10 000 多个文件中,其名称对应于印迹开头的字母。
【解决方案2】:

这是第三种更长的使用数据库的方法。数据库肯定比文本大。我不确定索引是否是最佳的,并且可以通过使用它来节省一些空间。 (比如,也许 WORD 和 POS,WORD 更好,或者 WORD,POS 就可以了,需要尝试一下)。

这可能在 200 OK 的测试中表现不佳,因为它包含大量重复文本,但可能在更多独特数据上表现良好。

首先通过扫描单词等创建数据库:

import sqlite3
import re

INPUT_FILENAME = 'bigfile.txt'
DB_NAME = 'words.db'
FLUSH_X_WORDS=10000


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()


cursor.execute("""
CREATE TABLE IF NOT EXISTS WORDS (
     POS INTEGER
    ,WORD TEXT
    ,PRIMARY KEY( POS, WORD )
) WITHOUT ROWID
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_WORD_POS
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_POS_WORD
""")


cursor.execute("""
DELETE FROM WORDS
""")

conn.commit()

def flush_words(words):
    for word in words.keys():
        for pos in words[word]:
            cursor.execute('INSERT INTO WORDS (POS, WORD) VALUES( ?, ? )', (pos, word.lower()) )

    conn.commit()

words = dict()
pos = 0
recomp = re.compile('\w+')
with open(INPUT_FILENAME, 'r') as f:
    for line in f:

        for word in [x.lower() for x in recomp.findall(line) if x]:
            pos += 1
            if words.has_key(word):
                words[word].append(pos)
            else:
                words[word] = [pos]
        if pos % FLUSH_X_WORDS == 0:
            flush_words(words)
            words = dict()
    if len(words) > 0:
        flush_words(words)
        words = dict()


cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_WORD_POS ON WORDS ( WORD, POS )
""")

cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_POS_WORD ON WORDS ( POS, WORD )
""")

cursor.execute("""
VACUUM
""")

cursor.execute("""
ANALYZE WORDS
""")

然后通过生成SQL搜索数据库:

import sqlite3
import re

SEARCH_PHRASE = 'how fast it is'
DB_NAME = 'words.db'


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

recomp = re.compile('\w+')

search_list = [x.lower() for x in recomp.findall(SEARCH_PHRASE) if x]

from_clause = 'FROM\n'
where_clause = 'WHERE\n'
num = 0
fsep = '     '
wsep = '     '
for word in search_list:
    num += 1
    from_clause += '{fsep}words w{num}\n'.format(fsep=fsep,num=num)
    where_clause += "{wsep} w{num}.word = '{word}'\n".format(wsep=wsep, num=num, word=word)
    if num > 1:
        where_clause += "  AND w{num}.pos = w{lastnum}.pos + 1\n".format(num=str(num),lastnum=str(num-1))

    fsep = '    ,'
    wsep = '  AND'


sql = """{select}{fromc}{where}""".format(select='SELECT COUNT(*)\n',fromc=from_clause, where=where_clause)

res = cursor.execute( sql )

print res.fetchone()[0] 

【讨论】:

  • NP,希望这些方法之一有所帮助。我相信有一个更好的版本可以实现,有很多搜索字符串算法,你只需要一个可以在磁盘上高效工作的算法。
  • 我会先使用200 OK的解决方案,然后尝试使用你的multithresding sp;utiom来提高速度。
  • 对于较小的文件,这也是一个很好的解决方案,我将尝试使用带有 txt 文件的解决方案,因为拥有多个文件比拥有多个数据库更容易。
【解决方案3】:

我们讨论的是在相当大的数据流中对特定子字符串进行简单计数。该任务几乎肯定受 I/O 限制,但很容易并行化。第一层是原始读取速度;我们可以选择通过压缩来减少读取量,或者通过将数据存储在多个位置来分配传输速率。然后我们有搜索本身;子字符串搜索是一个众所周知的问题,同样是 I/O 受限。如果数据集来自单个磁盘,那么几乎任何优化都没有实际意义,因为磁盘无法在速度上击败单个内核。

假设我们确实有块,例如,可能是 bzip2 文件的单独块(如果我们使用线程解压缩器)、RAID 中的条带或分布式节点,我们可以从单独处理它们中获益良多。每个块都搜索needle,然后可以通过从一个块的末尾和下一个块的开头获取len(needle)-1,并在其中搜索来形成关节。

快速基准测试表明,正则表达式状态机的运行速度比通常的 in 运算符快:

>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518

假设我们在文件中有数据,我们可以执行的另一个优化步骤是 mmap 它,而不是使用通常的读取操作。这允许操作系统直接使用磁盘缓冲区。它还允许内核以任意顺序满足多个读取请求,而无需进行额外的系统调用,这使我们能够在多个线程中运行时利用底层 RAID 之类的东西。

这是一个快速拼凑起来的原型。一些事情显然可以改进,例如如果我们有一个多节点集群,则分配块进程,通过将一个传递给相邻的工作人员(在此实现中未知的顺序)来进行尾部+头部检查,而不是将两者都发送到一个特殊的工作者,并实现一个线程间限制队列(管道)类而不是匹配信号量。将工作线程移到主线程函数之外可能也是有意义的,因为主线程不断改变其局部变量。

from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque

def search(needle, filename):
    # Might want chunksize=RAID block size, threads
    chunksize=ALLOCATIONGRANULARITY*1024
    threads=32
    # Read chunk allowance
    allocchunks=Semaphore(threads)  # should maybe be larger
    chunkqueue=deque()   # Chunks mapped, read by workers
    chunksready=Semaphore(0)
    headtails=Semaphore(0)   # edges between chunks into special worker
    headtailq=deque()
    sumq=deque()     # worker final results

    # Note: although we do push and pop at differing ends of the
    # queues, we do not actually need to preserve ordering. 

    def headtailthread():
        # Since head+tail is 2*len(needle)-2 long, 
        # it cannot contain more than one needle
        htsum=0
        matcher=compile(escape(needle))
        heads={}
        tails={}
        while True:
            headtails.acquire()
            try:
                pos,head,tail=headtailq.popleft()
            except IndexError:
                break  # semaphore signaled without data, end of stream
            try:
                prevtail=tails.pop(pos-chunksize)
                if matcher.search(prevtail+head):
                    htsum+=1
            except KeyError:
                heads[pos]=head
            try:
                nexthead=heads.pop(pos+chunksize)
                if matcher.search(tail+nexthead):
                    htsum+=1
            except KeyError:
                tails[pos]=tail
        # No need to check spill tail and head as they are shorter than needle
        sumq.append(htsum)

    def chunkthread():
        threadsum=0
        # escape special characters to achieve fixed string search
        matcher=compile(escape(needle))
        borderlen=len(needle)-1
        while True:
            chunksready.acquire()
            try:
                pos,chunk=chunkqueue.popleft()
            except IndexError:   # End of stream
                break
            # Let the re module do the heavy lifting
            threadsum+=len(matcher.findall(chunk))
            if borderlen>0:
                # Extract the end pieces for checking borders
                head=chunk[:borderlen]
                tail=chunk[-borderlen:]
                headtailq.append((pos,head,tail))
                headtails.release()
            chunk.close()
            allocchunks.release()  # let main thread allocate another chunk
        sumq.append(threadsum)

    with infile=open(filename,'rb'):
        htt=Thread(target=headtailthread)
        htt.start()
        chunkthreads=[]
        for i in range(threads):
            t=Thread(target=chunkthread)
            t.start()
            chunkthreads.append(t)
        pos=0
        fileno=infile.fileno()
        while True:
            allocchunks.acquire()
            chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
            chunkqueue.append((pos,chunk))
            chunksready.release()
            pos+=chunksize
            if pos>chunk.size():   # Last chunk of file?
                break
        # File ended, finish all chunks
        for t in chunkthreads:
            chunksready.release()   # wake thread so it finishes
        for t in chunkthreads:
            t.join()    # wait for thread to finish
        headtails.release()     # post event to finish border checker
        htt.join()
        # All threads finished, collect our sum
        return sum(sumq)

if __name__=="__main__":
    from sys import argv
    print "Found string %d times"%search(*argv[1:])

另外,修改整个事情以使用一些 mapreduce 例程(将块映射到计数、头部和尾部,通过求和计数和检查尾部+头部部分来减少)作为练习。

编辑:由于这个搜索似乎会用不同的针重复,索引会更快,能够跳过已知不匹配的部分的搜索。一种可能性是制作一张地图,其中包含任何出现的各种 n-gram(通过允许 ngram 重叠到下一个来考虑块边界);在需要加载原始数据块之前,可以组合这些地图以找到更复杂的条件。当然有数据库可以做到这一点。寻找全文搜索引擎。

【讨论】:

  • 我刚刚注意到.. 如果文件不是块大小的倍数,mmap 将在最后一个块上失败。我们需要先读取文件大小,然后减小最后一个块的大小。
  • ngram 索引本身可以通过制作布隆过滤器来减小大小。
  • 这是一个很酷的解决方案。如果我在前 5 个字母或一个印迹之后命名文件,我可以这样做会更简单。我会像这样排列成对:“印迹”,“频率”。例如:“小红鞋”,“122”。
【解决方案4】:

我使用file.read() 以块的形式读取数据,在当前示例中,块的大小分别为 100 MB、500MB、1GB 和 2GB。我的文本文件大小为 2.1 GB。

代码:

 from functools import partial

 def read_in_chunks(size_in_bytes):

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt', 'r+b') as f:
        prev = ''
        count = 0
        f_read  = partial(f.read, size_in_bytes)
        for text in iter(f_read, ''):
            if not text.endswith('\n'):
                # if file contains a partial line at the end, then don't
                # use it when counting the substring count. 
                text, rest = text.rsplit('\n', 1)
                # pre-pend the previous partial line if any.
                text =  prev + text
                prev = rest
            else:
                # if the text ends with a '\n' then simple pre-pend the
                # previous partial line. 
                text =  prev + text
                prev = ''
            count += text.count(s)
        count += prev.count(s)
        print count

时间安排:

read_in_chunks(104857600)
$ time python so.py
10000000

real    0m1.649s
user    0m0.977s
sys     0m0.669s

read_in_chunks(524288000)
$ time python so.py
10000000

real    0m1.558s
user    0m0.893s
sys     0m0.646s

read_in_chunks(1073741824)
$ time python so.py
10000000

real    0m1.242s
user    0m0.689s
sys     0m0.549s


read_in_chunks(2147483648)
$ time python so.py
10000000

real    0m0.844s
user    0m0.415s
sys     0m0.408s

另一方面,简单循环版本在我的系统上大约需要 6 秒:

def simple_loop():

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt') as f:
        print sum(line.count(s) for line in f)

$ time python so.py
10000000

real    0m5.993s
user    0m5.679s
sys     0m0.313s

@SlaterTyranus 的 grep version 在我的文件中的结果:

$ time grep -o 'Lets say i have a text file of 1000 GB' data.txt|wc -l
10000000

real    0m11.975s
user    0m11.779s
sys     0m0.568s

@woot 的solution 的结果:

$ time cat data.txt | parallel --block 10M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m5.955s
user    0m14.825s
sys     0m5.766s

当我使用 100 MB 作为块大小时获得了最佳时机:

$ time cat data.txt | parallel --block 100M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m4.632s
user    0m13.466s
sys     0m3.290s

woot 的second solution 的结果:

$ time python woot_thread.py # CHUNK_SIZE = 1073741824
10000000

real    0m1.006s
user    0m0.509s
sys     0m2.171s
$ time python woot_thread.py  #CHUNK_SIZE = 2147483648
10000000

real    0m1.009s
user    0m0.495s
sys     0m2.144s

系统规格:Core i5-4670、7200 RPM HDD

【讨论】:

  • 速度要快得多。但是速度受硬盘速度的限制。任何显着压缩文件的方式 - 例如从 1000 GB 到 1 GB。也许使用数据库。
  • @Brana 不知道压缩的东西。 ://
  • 这是迄今为止最快的。是否可以将其添加到数据库中并更快。
  • 值得指出grep 的效率取决于语言环境。在执行grep之前设置LANG=C,你可能会注意到时间上的不同。
  • 这种解决方案称为缓存阻塞。它很快,因为您减少了缓存未命中的次数。尝试根据您的缓存大小使用这种方法。我不知道如何在 python 中找到合适的缓存大小。
【解决方案5】:

这是一个 Python 尝试...您可能需要使用 THREADS 和 CHUNK_SIZE。而且它在短时间内是一堆代码,所以我可能没有想到所有事情。我确实重叠了我的缓冲区以捕获介于两者之间的缓冲区,并且我扩展了最后一个块以包含文件的其余部分。

import os
import threading

INPUTFILE ='bigfile.txt'
SEARCH_STRING='how fast it is'
THREADS = 8  # Set to 2 times number of cores, assuming hyperthreading
CHUNK_SIZE = 32768

FILESIZE = os.path.getsize(INPUTFILE)
SLICE_SIZE = FILESIZE / THREADS



class myThread (threading.Thread):
    def __init__(self, filehandle, seekspot):
        threading.Thread.__init__(self)
        self.filehandle = filehandle
        self.seekspot = seekspot
        self.cnt = 0
    def run(self):
        self.filehandle.seek( self.seekspot )

        p = self.seekspot
        if FILESIZE - self.seekspot < 2 * SLICE_SIZE:
            readend = FILESIZE
        else: 
            readend = self.seekspot + SLICE_SIZE + len(SEARCH_STRING) - 1
        overlap = ''
        while p < readend:
            if readend - p < CHUNK_SIZE:
                buffer = overlap + self.filehandle.read(readend - p)
            else:
                buffer = overlap + self.filehandle.read(CHUNK_SIZE)
            if buffer:
                self.cnt += buffer.count(SEARCH_STRING)
            overlap = buffer[len(buffer)-len(SEARCH_STRING)+1:]
            p += CHUNK_SIZE

filehandles = []
threads = []
for fh_idx in range(0,THREADS):
    filehandles.append(open(INPUTFILE,'rb'))
    seekspot = fh_idx * SLICE_SIZE
    threads.append(myThread(filehandles[fh_idx],seekspot ) )
    threads[fh_idx].start()

totalcount = 0 
for fh_idx in range(0,THREADS):
    threads[fh_idx].join()
    totalcount += threads[fh_idx].cnt

print totalcount

【讨论】:

    【解决方案6】:

    你看过使用parallel/grep吗?

    cat bigfile.txt | parallel --block 10M --pipe grep -o 'how\ fast\ it\ is' | wc -l
    

    【讨论】:

    • 我刚刚在我的 Linux VM 上对其进行了测试,它可以正常工作。你遇到了什么错误?另外,我添加了 -o 和 | wc -l 更正确地回答问题。
    • 我将它提升到 100M 大约需要 4.6 秒(如果我没记错的话是 100 MB),任何高于或低于它的东西都比它慢。
    • 是的,我不知道该设置什么数字。您还可以使用 --j N 超额订阅,其中 N 是内核数的 2 倍,以考虑超线程。
    • 我尝试了--j 8--j 4,但没有明显改善。仍然是 4.6 秒。(我有 4 核处理器)
    • 是的。 parallel 在选择正确的数字方面做得很好,但值得一试。您是否尝试过多线程 Python 方法?我很好奇你的测试时间。
    【解决方案7】:

    我承认 grep 会更快。我假设这个文件是一个基于字符串的大文件。

    但如果你真的想要,你可以做这样的事情。

    import os
    import re
    import mmap
    
    fileName = 'bigfile.txt'
    phrase = re.compile("how fast it is")
    
    with open(fileName, 'r') as fHandle:
        data = mmap.mmap(fHandle.fileno(), os.path.getsize(fileName), access=mmap.ACCESS_READ)
        matches = re.match(phrase, data)
        print('matches = {0}'.format(matches.group()))
    

    【讨论】:

    • 更多解释会更有用。还有一个较小文件的速度比较。正则表达式也非常慢。
    • 这个不加载文件。我测试了它:
    【解决方案8】:

    建议使用 grep 而不是 python。会更快,而且通常如果您在本地计算机上处​​理 1000GB 的文本,您就做错了,但抛开所有判断不谈,grep 提供了几个选项,可以让您的生活更轻松。

    grep -o '<your_phrase>' bigfile.txt|wc -l
    

    具体来说,这将计算您想要的短语出现的行数。这也应该计算单行上的多次出现。

    如果你不需要,你可以这样做:

    grep -c '<your_phrase>' bigfile.txt
    

    【讨论】:

    • @lejlot wikipedia 上的所有文本加起来小于 1000 GB 的多个数量级。这个星球上没有一个文本语料库比几百个演出大,那是谷歌的整个互联网的文本语料库。文字很小。
    • OP 是在问一个典型的谷歌面试问题。谷歌有足够的数据,所以这个问题还是有道理的。
    • @Slater,对不起,你是对的,wiki“只是”44 GB
    • @lejlot 比省略存根、重复和多种语言时要小得多。
    • @Pavel 现在你认为我在推测:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-03
    • 2016-09-02
    • 2021-10-26
    • 1970-01-01
    相关资源
    最近更新 更多