我们讨论的是在相当大的数据流中对特定子字符串进行简单计数。该任务几乎肯定受 I/O 限制,但很容易并行化。第一层是原始读取速度;我们可以选择通过压缩来减少读取量,或者通过将数据存储在多个位置来分配传输速率。然后我们有搜索本身;子字符串搜索是一个众所周知的问题,同样是 I/O 受限。如果数据集来自单个磁盘,那么几乎任何优化都没有实际意义,因为磁盘无法在速度上击败单个内核。
假设我们确实有块,例如,可能是 bzip2 文件的单独块(如果我们使用线程解压缩器)、RAID 中的条带或分布式节点,我们可以从单独处理它们中获益良多。每个块都搜索needle,然后可以通过从一个块的末尾和下一个块的开头获取len(needle)-1,并在其中搜索来形成关节。
快速基准测试表明,正则表达式状态机的运行速度比通常的 in 运算符快:
>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518
假设我们在文件中有数据,我们可以执行的另一个优化步骤是 mmap 它,而不是使用通常的读取操作。这允许操作系统直接使用磁盘缓冲区。它还允许内核以任意顺序满足多个读取请求,而无需进行额外的系统调用,这使我们能够在多个线程中运行时利用底层 RAID 之类的东西。
这是一个快速拼凑起来的原型。一些事情显然可以改进,例如如果我们有一个多节点集群,则分配块进程,通过将一个传递给相邻的工作人员(在此实现中未知的顺序)来进行尾部+头部检查,而不是将两者都发送到一个特殊的工作者,并实现一个线程间限制队列(管道)类而不是匹配信号量。将工作线程移到主线程函数之外可能也是有意义的,因为主线程不断改变其局部变量。
from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque
def search(needle, filename):
# Might want chunksize=RAID block size, threads
chunksize=ALLOCATIONGRANULARITY*1024
threads=32
# Read chunk allowance
allocchunks=Semaphore(threads) # should maybe be larger
chunkqueue=deque() # Chunks mapped, read by workers
chunksready=Semaphore(0)
headtails=Semaphore(0) # edges between chunks into special worker
headtailq=deque()
sumq=deque() # worker final results
# Note: although we do push and pop at differing ends of the
# queues, we do not actually need to preserve ordering.
def headtailthread():
# Since head+tail is 2*len(needle)-2 long,
# it cannot contain more than one needle
htsum=0
matcher=compile(escape(needle))
heads={}
tails={}
while True:
headtails.acquire()
try:
pos,head,tail=headtailq.popleft()
except IndexError:
break # semaphore signaled without data, end of stream
try:
prevtail=tails.pop(pos-chunksize)
if matcher.search(prevtail+head):
htsum+=1
except KeyError:
heads[pos]=head
try:
nexthead=heads.pop(pos+chunksize)
if matcher.search(tail+nexthead):
htsum+=1
except KeyError:
tails[pos]=tail
# No need to check spill tail and head as they are shorter than needle
sumq.append(htsum)
def chunkthread():
threadsum=0
# escape special characters to achieve fixed string search
matcher=compile(escape(needle))
borderlen=len(needle)-1
while True:
chunksready.acquire()
try:
pos,chunk=chunkqueue.popleft()
except IndexError: # End of stream
break
# Let the re module do the heavy lifting
threadsum+=len(matcher.findall(chunk))
if borderlen>0:
# Extract the end pieces for checking borders
head=chunk[:borderlen]
tail=chunk[-borderlen:]
headtailq.append((pos,head,tail))
headtails.release()
chunk.close()
allocchunks.release() # let main thread allocate another chunk
sumq.append(threadsum)
with infile=open(filename,'rb'):
htt=Thread(target=headtailthread)
htt.start()
chunkthreads=[]
for i in range(threads):
t=Thread(target=chunkthread)
t.start()
chunkthreads.append(t)
pos=0
fileno=infile.fileno()
while True:
allocchunks.acquire()
chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
chunkqueue.append((pos,chunk))
chunksready.release()
pos+=chunksize
if pos>chunk.size(): # Last chunk of file?
break
# File ended, finish all chunks
for t in chunkthreads:
chunksready.release() # wake thread so it finishes
for t in chunkthreads:
t.join() # wait for thread to finish
headtails.release() # post event to finish border checker
htt.join()
# All threads finished, collect our sum
return sum(sumq)
if __name__=="__main__":
from sys import argv
print "Found string %d times"%search(*argv[1:])
另外,修改整个事情以使用一些 mapreduce 例程(将块映射到计数、头部和尾部,通过求和计数和检查尾部+头部部分来减少)作为练习。
编辑:由于这个搜索似乎会用不同的针重复,索引会更快,能够跳过已知不匹配的部分的搜索。一种可能性是制作一张地图,其中包含任何出现的各种 n-gram(通过允许 ngram 重叠到下一个来考虑块边界);在需要加载原始数据块之前,可以组合这些地图以找到更复杂的条件。当然有数据库可以做到这一点。寻找全文搜索引擎。