【问题标题】:Why is my multithreaded parser not multithreading?为什么我的多线程解析器不是多线程的?
【发布时间】:2016-02-24 12:19:19
【问题描述】:

我有一个使用 ElementTree Path Evaluator 解析 xml 文件的脚本。它工作正常,但它需要很长时间才能完成。所以我尝试做一个多线程的实现:

import fnmatch
import operator
import os
import lxml.etree
from nltk import FreqDist
from nltk.corpus import stopwords
from collections import defaultdict
from datetime import datetime
import threading
import Queue

STOPWORDS = stopwords.words('dutch')
STOPWORDS.extend(stopwords.words('english'))
DIR_NAME = 'A_DIRNAME'
PATTERN = '*.A_PATTERN'

def loadData(dir_name, pattern):
    nohyphen_files = []
    dir_names = []
    dir_paths = []
    for root, dirnames, filenames in os.walk(dir_name):
        dir_names.append(dirnames)
        dir_paths.append(root)
        for filename in fnmatch.filter(filenames, pattern):
            nohyphen_files.append(os.path.join(root, filename))
    return nohyphen_files, dir_names, dir_paths

def freq(element_list, descending = True):
    agglomerated = defaultdict(int)
    for e in element_list:
        agglomerated[e] += 1
    return sorted(agglomerated.items(), key=operator.itemgetter(1), reverse=descending)

def lexDiv(amount_words):
    return 1.0*len(set(amount_words))/len(amount_words)

def anotherFreq(list_types, list_words):
    fd = FreqDist(list_types)
    print 'top 10 most frequent types:'
    for t, freq in fd.items()[:10]:
        print t, freq
    print '\ntop 10 most frequent words:'
    agglomerated = defaultdict(int)
    for w in list_words:
        if not w.lower() in STOPWORDS:
            agglomerated[w] += 1
    sorted_dict = sorted(agglomerated.items(), key=operator.itemgetter(1),reverse=True)
    print sorted_dict[:10]

def extractor(f):
    print "check file: {}".format(f)
    try:
        # doc = lxml.etree.ElementTree(lxml.etree.XML(f))
        doc = lxml.etree.ElementTree(file=f)
    except lxml.etree.XMLSyntaxError, e:
        print e
        return
    doc_evaluator = lxml.etree.XPathEvaluator(doc)
    entities = doc_evaluator('//entity/*/externalRef/@reference')
    places_dbpedia = doc_evaluator('//entity[contains(@type, "Schema:Place")]/*/externalRef/@reference')
    non_people_dbpedia = set(doc_evaluator('//entity[not(contains(@type, "Schema:Person"))]'))
    people = doc_evaluator('//entity[contains(@type, "Schema:Person")]/*/externalRef/@reference')
    words = doc.xpath('text/wf[re:match(text(), "[A-Za-z-]")]/text()',\
        namespaces={"re": "http://exslt.org/regular-expressions"})
    unique_words = set(words)
    other_tokens = doc.xpath('text/wf[re:match(text(), "[^A-Za-z-]")]/text()',\
        namespaces={"re": "http://exslt.org/regular-expressions"})
    amount_of_sentences = doc_evaluator('text/wf/@sent')[-1]
    types = doc_evaluator('//term/@morphofeat')
    longest_sentence = freq(doc.xpath('text/wf[re:match(text(), "[A-Za-z-]")]/@sent',\
        namespaces={"re": "http://exslt.org/regular-expressions"}))[0]

    top_people = freq([e.split('/')[-1] for e in people])[:10]
    top_entities = freq([e.split('/')[-1] for e in entities])[:10]
    top_places = freq([e.split('/')[-1] for e in places_dbpedia])[:10]

def worker():
    while 1:
        job_number = q.get()
        extractor(job_number)
        q.task_done() #this thread is complete, move on

if __name__ =='__main__':
    startTime = datetime.now()
    files, dirs, path = loadData(DIR_NAME, PATTERN)
    startTime = datetime.now()

q = Queue.Queue()# job queue

for f in files:
    q.put(f)

for i in range(20): #make 20 workerthreads ready
    worker_thread = threading.Thread(target=worker)
    worker_thread.daemon = True
    worker_thread.start()

q.join()
print datetime.now() - startTime

这有一些作用,但是在计时时,它并不比普通版本快。我认为这与打开和读取文件有关,使线程不是多线程的。如果我使用一个函数而不是解析 xml 文件只是休眠几秒钟并打印一些东西,它确实可以工作并且速度要快得多。拥有多线程 XML 解析器需要注意什么?

【问题讨论】:

  • 使用threading 只会使您编写的代码并行化。它实际上并没有让它穿过 CPU 中的核心(如果我在这里错了,请纠正我)。而且从单个驱动器读取也将是一个瓶颈,因为磁盘本身在给定时间只能处理这么多的 I/O。您最多可以从并行化(?!?)代码中获得几秒钟甚至几分钟的时间。您需要的是更好的存储量,并且可能需要一种缓存机制以加快读取速度。如果可能,请先尝试将文件读入 RAM 或数据库作为缓存,然后再使用它们。或者 RAID 你的磁盘。
  • @Torxed 我的目标不是让它跨越 CPU 的内核。并行化是我的目标,我想要说明的是,运行一个执行某种解析的函数会使整个编程阻塞。所以假设函数提取器只是休眠一秒钟并打印一些东西然后它可以工作,但是在解析 xml 文件时它不能以并行方式工作。你知道它是否与 XPathEvaluator 有关,是否有解决方法?

标签: python xpath lxml elementtree python-multithreading


【解决方案1】:

Python 中的线程不像在其他语言中那样工作。它依赖于全局解释器锁,确保一次只有一个线程处于活动状态(准确地说是运行字节码)。
您想要做的是改用多进程库。
您可以在此处阅读有关 GIL 和线程的更多信息:
https://docs.python.org/2/glossary.html#term-global-interpreter-lock
https://docs.python.org/2/library/threading.html

【讨论】:

  • 系统实例化线程而不是进程会更便宜。我唯一想做的就是可以被视为多线程网络爬虫的东西。我正在做的事情可以看作是收集文件的路径作为收集 url 并并行化我的代码以同时在不同的文件上执行相同的功能。所以我不明白为什么诸如睡眠两秒钟然后打印一些东西的功能会通过多线程和解析 XML 文件来工作。你能给我解释一下吗?
  • @Soufyan Belkaid 在 Python 中执行字节码可以通过以下方式来简化基本的对象操作。进程中任何需要等待的线程都会让位于另一个线程继续执行。针对真实线程推理because performance suffered in the common single-processor case. It is believed that overcoming this performance issue would make the implementation much more complicated and therefore costlier to maintain.
猜你喜欢
  • 2015-07-09
  • 2013-05-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-06-20
  • 2012-06-22
相关资源
最近更新 更多