【问题标题】:Parallel file matching, Python并行文件匹配,Python
【发布时间】:2011-11-29 04:59:44
【问题描述】:

我正在尝试改进扫描文件以查找恶意代码的脚本。我们在一个文件中有一个正则表达式模式列表,每行一个模式。这些正则表达式用于 grep,因为我们当前的实现基本上是一个 bash 脚本 find\grep 组合。 bash 脚本在我的基准目录上需要 358 秒。我能够编写一个在 72 秒内完成此操作的 python 脚本,但我想改进更多。首先,我将发布基本代码,然后进行我尝试过的调整:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()

这显然是调试\丑陋的代码,千万不要介意queue.put(-1),我稍后会清理这个。一些缩进没有正确显示,尤其是在 scanFile 中。

无论如何,我注意到了一些事情。使用 1、4 甚至 8 个线程(对于 xrange(0,???):) 中的扫描仪并没有什么不同。无论如何,我仍然得到〜72秒。我认为这是由于 python 的 GIL 造成的。

与制作一个巨大的正则表达式相反,我尝试将每一行(模式)作为一个 compilex RE 放在一个列表中,并在我的 scanfile 函数中遍历这个列表。这导致执行时间更长。

为了避免 python 的 GIL,我尝试让每个线程分叉到 grep,如下所示:

#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
      s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
      output = s.communicate()[0]
      if output != '':
         print 'Matchfound in ' + file

这导致执行时间更长。

关于提高性能的任何建议。

:::::::::::::EDIT:::::::::

我还不能发布我自己问题的答案,但是这里是对提出的几个问题的答案:

@David Nehme - 只是为了让人们知道我知道我有一百万个 queue.put(-1) 的事实

@Blender - 标记队列的底部。我的扫描仪线程一直在出队,直到它们到达底部的 -1(而 nextFile 不是 -1:)。处理器内核为 8,但由于 GIL 使用 1 个线程、4 个线程或 8 个线程并没有区别。生成 8 个子进程导致代码明显变慢(142 秒 vs 72 秒)

@ed - 是的,它和 find\grep 组合一样慢,实际上更慢,因为它不加选择地 greps 不需要的文件

@Ron - 无法升级,这必须是通用的。你认为这会加速 > 72 秒吗? bash grepper 执行 358 秒。我的 python 巨型 RE 方法使用 1-8 个线程执行 72 秒。包含 8 个线程(8 个子进程)的 popen 方法运行时间为 142 秒。到目前为止,巨大的 RE python only 方法是迄今为止明显的赢家

@intuted

这是我们当前的 find\grep 组合的内容(不是我的脚本)。这很简单。里面有一些额外的东西,比如 ls,但没有什么会导致 5 倍的减速。即使 grep -r 稍微高效一点,5x 也是一个巨大的减速。

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"

python代码效率更高,不知道为什么,但我实验测试了一下。我更喜欢在 python 中执行此操作。我已经用 python 实现了 5 倍的加速,我想让它加速更多。

::::::::::::WINNER WINNER WINNER:::::::::::::::::

看来我们赢了。

intued 的 shell 脚本以 34 秒排名第二,但 @steveha 以 24 秒排名第一。由于我们的很多盒子没有python2.6,我不得不cx_freeze它。我可以编写一个 shell 脚本包装器来获取 tar 并解压缩它。不过,为了简单起见,我确实喜欢 intued。

感谢大家的帮助,我现在有了一个高效的系统管理工具

【问题讨论】:

  • 首先,发布您不希望读者介意的代码有什么意义?
  • self.queue.put(-1) 的目的是什么?此外,线程数应该是处理器支持的内核/线程数。你可能有一个单核。
  • 您是否尝试过仅使用一个递归 grep 命令,将全部内容作为长正则表达式?将诸如文件处理之类的事情留给已经优化过的程序(例如 grep)通常是个好主意。
  • ed 的建议,结合所有的正则表达式,意味着 grep(或其他)可以合并正则表达式并在所有正则表达式中做一些共同的工作;您可以看到以这种方式合并正则表达式而不是单独匹配每个正则表达式有很大的加速!
  • 不是 --file=??把它当作一个巨大的正则表达式?对于我的 python 脚本,我已经使用了一个巨大的正则表达式,这在我的代码中很清楚。这可能是我的 python 脚本需要 72 秒的原因之一

标签: python multithreading string parallel-processing gil


【解决方案1】:

如果您愿意升级到 3.2 或更高版本,您可以利用 concurrent.futures.ProcessPoolExecutor。我认为它会比您尝试的 popen 方法提高性能,因为它会预先创建一个进程池,您的 popen 方法每次都会创建一个新进程。如果由于某种原因无法迁移到 3.2,您可以编写自己的代码来为早期版本执行相同的操作。

【讨论】:

    【解决方案2】:

    对于您的 Python 脚本最终如何比您的 find/grep 组合更快,我感到有些困惑。如果您想以类似于 Ron Smith 在他的回答中建议的方式使用grep,您可以执行类似的操作

    find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns
    

    启动grep 进程,该进程将在退出前处理 100 个文件,同时保持多达 8 个此类进程处于活动状态。让他们处理 100 个文件应该可以使每个文件的进程启动开销时间可以忽略不计。

    注意xargs-d \\n 选项是一个 GNU 扩展,它不适用于所有 POSIX-ish 系统。它指定文件名之间的 *d*elimiter 是换行符。尽管从技术上讲,文件名可以包含换行符,但实际上没有人这样做并保留他们的工作。为了与非 GNU xargs 兼容,您需要将 -print0 选项添加到 find 并使用 -0 而不是 -d \\nxargs。这将安排空字节\0(十六进制0x00)用作findxargs 的分隔符。

    你也可以采取先统计要被grep的文件个数的方法

    NUMFILES="$(find -type f | wc -l)";
    

    然后使用该数字在 8 个进程之间进行平均分配(假设 bash 作为 shell)

    find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns
    

    我认为这可能会更好,因为find 的磁盘 I/O 不会干扰各种greps 的磁盘 I/O。我想这部分取决于文件有多大,以及它们是否连续存储——对于小文件,磁盘无论如何都会寻找很多,所以没那么重要。另请注意,特别是如果您有相当数量的 RAM,则此类命令的后续运行会更快,因为某些文件将保存在您的内存缓存中。

    当然,您可以参数化8,以便更轻松地尝试不同数量的并发进程。

    作为编辑。在 cmets 中提到,这种方法的性能很可能仍然不如单进程 grep -r 的性能那么令人印象深刻。我想这取决于您的磁盘 [阵列] 的相对速度、系统中的处理器数量等。

    【讨论】:

    • 这里是我们当前 find\grep 组合的内容(不是我的脚本)。这很简单。里面有一些额外的东西,比如 ls,但没有什么会导致 5 倍的减速。即使 grep -r 效率稍高一些,5x 也是一个巨大的减速。 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -和-ls | tee -a "${HOME}/found.txt"
    • 和你一样,我是 Python 的粉丝。我不想劝你不要使用它。但是@intuited 向您展示了如何大幅加快find 解决方案的速度:使用xargs 构建长命令行供grep 浏览。他正在启动 8 个grep 进程,每个进程一次处理 100 个文件;您正在为每个文件启动一个 grep 进程,并经常加上一个 tee 进程。启动和关闭所有这些进程的开销肯定是您发现 Python 更快的原因。
    【解决方案3】:

    我认为,您应该为 Python 解决方案使用 multiprocessing 模块,而不是使用 threading 模块。 Python 线程可能会与 GIL 发生冲突;如果您只是有多个 Python 进程在运行,那么 GIL 不是问题。

    我认为你正在做的工作进程池正是你想要的。默认情况下,池将默认为系统处理器中的每个内核一个进程。只需调用 .map() 方法,其中包含要检查的文件名列表和执行检查的函数。

    http://docs.python.org/library/multiprocessing.html

    如果这不比您的 threading 实现快,那么我认为 GIL 不是您的问题。

    编辑:好的,我正在添加一个工作 Python 程序。这使用一个工作进程池来打开每个文件并在每个文件中搜索模式。当工作人员找到匹配的文件名时,它会简单地将其打印(到标准输出),这样您就可以将此脚本的输出重定向到一个文件中,并获得您的文件列表。

    编辑:我认为这是一个更容易阅读的版本,更容易理解。

    我对此进行了计时,在我的计算机上搜索 /usr/include 中的文件。它在大约半秒内完成搜索。使用find 通过xargs 管道运行尽可能少的grep 进程,大约需要0.05 秒,大约是10 倍的加速。但我讨厌你必须使用的巴洛克式怪异语言才能让find 正常工作,我喜欢 Python 版本。也许在非常大的目录上,差异会更小,因为 Python 半秒的一部分必须是启动时间。对于大多数用途来说,半秒可能已经足够快了!

    import multiprocessing as mp
    import os
    import re
    import sys
    
    from stat import S_ISREG
    
    
    # uncomment these if you really want a hard-coded $HOME/patterns file
    #home = os.environ.get('HOME')
    #patterns_file = os.path.join(home, 'patterns')
    
    target = sys.argv[1]
    size_limit = int(sys.argv[2])
    assert size_limit >= 0
    patterns_file = sys.argv[3]
    
    
    # build s_pat as string like:  (?:foo|bar|baz)
    # This will match any of the sub-patterns foo, bar, or baz
    # but the '?:' means Python won't bother to build a "match group".
    with open(patterns_file) as f:
        s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))
    
    # pre-compile pattern for speed
    pat = re.compile(s_pat)
    
    
    def walk_files(topdir):
        """yield up full pathname for each file in tree under topdir"""
        for dirpath, dirnames, filenames in os.walk(topdir):
            for fname in filenames:
                pathname = os.path.join(dirpath, fname)
                yield pathname
    
    def files_to_search(topdir):
        """yield up full pathname for only files we want to search"""
        for fname in walk_files(topdir):
            try:
                # if it is a regular file and big enough, we want to search it
                sr = os.stat(fname)
                if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                    yield fname
            except OSError:
                pass
    
    def worker_search_fn(fname):
        with open(fname, 'rt') as f:
            # read one line at a time from file
            for line in f:
                if re.search(pat, line):
                    # found a match! print filename to stdout
                    print(fname)
                    # stop reading file; just return
                    return
    
    mp.Pool().map(worker_search_fn, files_to_search(target))
    

    【讨论】:

    • 你是老板!!谢谢。您的脚本运行速度最快,但我确实遇到了 2.6 兼容性问题。经过一些修改后,我将其 CX_Freezed 并编写一个 bash 包装器来下载 tar 并调用它。
    • 我很高兴它对你有用!如果你希望它在 2.4 或 2.5 上运行,你可以试试这个:pypi.python.org/pypi/multiprocessing
    【解决方案4】:

    让我也向您展示如何在Ray 中执行此操作,这是一个用于编写并行 Python 应用程序的开源框架。这种方法的优点是速度快,易于编写和扩展(比如你想在任务之间传递大量数据或做一些有状态的积累),并且无需修改也可以在集群或云上运行。它在利用单台机器上的所有内核(即使对于像 100 个内核这样的大型机器)和任务之间的数据传输方面也非常有效。

    import os
    import ray
    import re
    
    ray.init()
    
    patterns_file = os.path.expanduser("~/patterns")
    topdir = os.path.expanduser("~/folder")
    
    with open(patterns_file) as f:
        s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))
    
    regex = re.compile(s_pat)
    
    @ray.remote
    def match(pattern, fname):
        results = []
        with open(fname, 'rt') as f:
            for line in f:
                if re.search(pattern, line):
                    results.append(fname)
        return results
    
    results = []
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            results.append(match.remote(regex, pathname))
    
    print("matched files", ray.get(results))
    

    documentatation 中提供了更多信息,包括如何在集群或云上运行它

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-09-02
      • 1970-01-01
      • 2015-12-22
      • 2018-07-12
      • 1970-01-01
      • 2019-07-28
      • 2020-06-12
      相关资源
      最近更新 更多